We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/basicmachines-co/basic-memory'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Test general sync behavior."""
import asyncio
from datetime import datetime, timezone
from pathlib import Path
from textwrap import dedent
import pytest
from basic_memory.config import ProjectConfig, BasicMemoryConfig
from basic_memory.models import Entity
from basic_memory.repository import EntityRepository
from basic_memory.schemas.search import SearchQuery
from basic_memory.services import EntityService, FileService
from basic_memory.services.search_service import SearchService
from basic_memory.sync.sync_service import SyncService
from basic_memory.utils import generate_permalink
async def create_test_file(path: Path, content: str = "test content") -> None:
"""Create a test file with given content."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content)
async def touch_file(path: Path) -> None:
"""Touch a file to update its mtime (for watermark testing)."""
import time
# Read and rewrite to update mtime
content = path.read_text()
time.sleep(0.5) # Ensure mtime changes and is newer than watermark (500ms)
path.write_text(content)
async def force_full_scan(sync_service: SyncService) -> None:
"""Force next sync to do a full scan by clearing watermark (for testing moves/deletions)."""
if sync_service.entity_repository.project_id is not None:
project = await sync_service.project_repository.find_by_id(
sync_service.entity_repository.project_id
)
if project:
await sync_service.project_repository.update(
project.id,
{
"last_scan_timestamp": None,
"last_file_count": None,
},
)
@pytest.mark.asyncio
async def test_forward_reference_resolution(
sync_service: SyncService,
project_config: ProjectConfig,
entity_service: EntityService,
):
"""Test that forward references get resolved when target file is created."""
project_dir = project_config.home
# First create a file with a forward reference
source_content = """
---
type: knowledge
---
# Source Document
## Relations
- depends_on [[target-doc]]
- depends_on [[target-doc]] # duplicate
"""
await create_test_file(project_dir / "source.md", source_content)
# Initial sync - should create forward reference
await sync_service.sync(project_config.home)
# Verify forward reference
project_prefix = generate_permalink(project_config.name)
source = await entity_service.get_by_permalink(f"{project_prefix}/source")
assert len(source.relations) == 1
assert source.relations[0].to_id is None
assert source.relations[0].to_name == "target-doc"
# Now create the target file
target_content = """
---
type: knowledge
---
# Target Doc
Target content
"""
target_file = project_dir / "target_doc.md"
await create_test_file(target_file, target_content)
# Force full scan to ensure the new file is detected
# Incremental scans have timing precision issues with watermarks on some filesystems
await force_full_scan(sync_service)
# Sync again - should resolve the reference
await sync_service.sync(project_config.home)
# Verify reference is now resolved
source = await entity_service.get_by_permalink(f"{project_prefix}/source")
target = await entity_service.get_by_permalink(f"{project_prefix}/target-doc")
assert len(source.relations) == 1
assert source.relations[0].to_id == target.id
assert source.relations[0].to_name == target.title
@pytest.mark.asyncio
async def test_resolve_relations_deletes_duplicate_unresolved_relation(
sync_service: SyncService,
project_config: ProjectConfig,
entity_service: EntityService,
):
"""Test that resolve_relations deletes duplicate unresolved relations on IntegrityError.
When resolving a forward reference would create a duplicate (from_id, to_id, relation_type),
the unresolved relation should be deleted since a resolved version already exists.
"""
from basic_memory.models import Relation
project_dir = project_config.home
# Create source entity
source_content = """
---
type: knowledge
---
# Source Entity
Content
"""
await create_test_file(project_dir / "source.md", source_content)
# Create target entity
target_content = """
---
type: knowledge
title: Target Entity
---
# Target Entity
Content
"""
await create_test_file(project_dir / "target.md", target_content)
# Sync to create both entities
await sync_service.sync(project_config.home)
project_prefix = generate_permalink(project_config.name)
source = await entity_service.get_by_permalink(f"{project_prefix}/source")
target = await entity_service.get_by_permalink(f"{project_prefix}/target")
# Create a resolved relation (already exists) that the unresolved one would become.
resolved_relation = Relation(
from_id=source.id,
to_id=target.id,
to_name=target.title,
relation_type="relates_to",
)
await sync_service.relation_repository.add(resolved_relation)
# Create an unresolved relation that will resolve to target
unresolved_relation = Relation(
from_id=source.id,
to_id=None, # Unresolved
to_name="target", # Will resolve to target entity
relation_type="relates_to",
)
await sync_service.relation_repository.add(unresolved_relation)
unresolved_id = unresolved_relation.id
# Verify we have the unresolved relation
source = await entity_service.get_by_permalink(f"{project_prefix}/source")
unresolved_outgoing = [r for r in source.outgoing_relations if r.to_id is None]
assert len(unresolved_outgoing) == 1
assert unresolved_outgoing[0].id == unresolved_id
assert unresolved_outgoing[0].to_name == "target"
# Call resolve_relations - should hit a real IntegrityError (unique constraint) and delete
# the duplicate unresolved relation.
await sync_service.resolve_relations()
# Verify the unresolved relation was deleted
deleted = await sync_service.relation_repository.find_by_id(unresolved_id)
assert deleted is None
# Verify no unresolved relations remain
unresolved = await sync_service.relation_repository.find_unresolved_relations()
assert len(unresolved) == 0
# Verify only the resolved relation remains
source = await entity_service.get_by_permalink(f"{project_prefix}/source")
assert len(source.outgoing_relations) == 1
assert source.outgoing_relations[0].to_id == target.id
@pytest.mark.asyncio
async def test_sync(
sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
"""Test basic knowledge sync functionality."""
# Create test files
project_dir = project_config.home
# New entity with relation
new_content = """
---
type: knowledge
permalink: concept/test-concept
created: 2023-01-01
modified: 2023-01-01
---
# Test Concept
A test concept.
## Observations
- [design] Core feature
## Relations
- depends_on [[concept/other]]
"""
await create_test_file(project_dir / "concept/test_concept.md", new_content)
# Create related entity in DB that will be deleted
# because file was not found
other = Entity(
permalink="concept/other",
title="Other",
note_type="test",
file_path="concept/other.md",
checksum="12345678",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
await entity_service.repository.add(other)
# Run sync
await sync_service.sync(project_config.home)
# Verify results
entities = await entity_service.repository.find_all()
assert len(entities) == 1
# Find new entity
test_concept = next(e for e in entities if e.permalink == "concept/test-concept")
assert test_concept.note_type == "knowledge"
# Verify relation was created
# with forward link
entity = await entity_service.get_by_permalink(test_concept.permalink)
relations = entity.relations
assert len(relations) == 1, "Expected 1 relation for entity"
assert relations[0].to_name == "concept/other"
@pytest.mark.asyncio
async def test_sync_hidden_file(
sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
"""Test basic knowledge sync functionality."""
# Create test files
project_dir = project_config.home
# hidden file
await create_test_file(project_dir / "concept/.hidden.md", "hidden")
# Run sync
await sync_service.sync(project_config.home)
# Verify results
entities = await entity_service.repository.find_all()
assert len(entities) == 0
@pytest.mark.asyncio
async def test_sync_entity_with_nonexistent_relations(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test syncing an entity that references nonexistent entities."""
project_dir = project_config.home
# Create entity that references entities we haven't created yet
content = """
---
type: knowledge
permalink: concept/depends-on-future
created: 2024-01-01
modified: 2024-01-01
---
# Test Dependencies
## Observations
- [design] Testing future dependencies
## Relations
- depends_on [[concept/not_created_yet]]
- uses [[concept/also_future]]
"""
await create_test_file(project_dir / "concept/depends_on_future.md", content)
# Sync
await sync_service.sync(project_config.home)
# Verify entity created but no relations
entity = await sync_service.entity_service.repository.get_by_permalink(
"concept/depends-on-future"
)
assert entity is not None
assert len(entity.relations) == 2
assert entity.relations[0].to_name == "concept/not_created_yet"
assert entity.relations[1].to_name == "concept/also_future"
@pytest.mark.asyncio
async def test_sync_entity_circular_relations(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test syncing entities with circular dependencies."""
project_dir = project_config.home
# Create entity A that depends on B
content_a = """
---
type: knowledge
permalink: concept/entity-a
created: 2024-01-01
modified: 2024-01-01
---
# Entity A
## Observations
- First entity in circular reference
## Relations
- depends_on [[concept/entity-b]]
"""
await create_test_file(project_dir / "concept/entity_a.md", content_a)
# Create entity B that depends on A
content_b = """
---
type: knowledge
permalink: concept/entity-b
created: 2024-01-01
modified: 2024-01-01
---
# Entity B
## Observations
- Second entity in circular reference
## Relations
- depends_on [[concept/entity-a]]
"""
await create_test_file(project_dir / "concept/entity_b.md", content_b)
# Sync
await sync_service.sync(project_config.home)
# Verify both entities and their relations
entity_a = await sync_service.entity_service.repository.get_by_permalink("concept/entity-a")
entity_b = await sync_service.entity_service.repository.get_by_permalink("concept/entity-b")
# outgoing relations
assert len(entity_a.outgoing_relations) == 1
assert len(entity_b.outgoing_relations) == 1
# incoming relations
assert len(entity_a.incoming_relations) == 1
assert len(entity_b.incoming_relations) == 1
# all relations
assert len(entity_a.relations) == 2
assert len(entity_b.relations) == 2
# Verify circular reference works
a_relation = entity_a.outgoing_relations[0]
assert a_relation.to_id == entity_b.id
b_relation = entity_b.outgoing_relations[0]
assert b_relation.to_id == entity_a.id
@pytest.mark.asyncio
async def test_sync_entity_duplicate_relations(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test handling of duplicate relations in an entity."""
project_dir = project_config.home
# Create target entity first
target_content = """
---
type: knowledge
permalink: concept/target
created: 2024-01-01
modified: 2024-01-01
---
# Target Entity
## Observations
- something to observe
"""
await create_test_file(project_dir / "concept/target.md", target_content)
# Create entity with duplicate relations
content = """
---
type: knowledge
permalink: concept/duplicate-relations
created: 2024-01-01
modified: 2024-01-01
---
# Test Duplicates
## Observations
- this has a lot of relations
## Relations
- depends_on [[concept/target]]
- depends_on [[concept/target]] # Duplicate
- uses [[concept/target]] # Different relation type
- uses [[concept/target]] # Duplicate of different type
"""
await create_test_file(project_dir / "concept/duplicate_relations.md", content)
# Sync
await sync_service.sync(project_config.home)
# Verify duplicates are handled
entity = await sync_service.entity_service.repository.get_by_permalink(
"concept/duplicate-relations"
)
# Count relations by type
relation_counts = {}
for rel in entity.relations:
relation_counts[rel.relation_type] = relation_counts.get(rel.relation_type, 0) + 1
# Should only have one of each type
assert relation_counts["depends_on"] == 1
assert relation_counts["uses"] == 1
@pytest.mark.asyncio
async def test_sync_entity_with_random_categories(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test handling of random observation categories."""
project_dir = project_config.home
content = """
---
type: knowledge
permalink: concept/invalid-category
created: 2024-01-01
modified: 2024-01-01
---
# Test Categories
## Observations
- [random category] This is fine
- [ a space category] Should default to note
- This one is not an observation, should be ignored
- [design] This is valid
"""
await create_test_file(project_dir / "concept/invalid_category.md", content)
# Sync
await sync_service.sync(project_config.home)
# Verify observations
entity = await sync_service.entity_service.repository.get_by_permalink(
"concept/invalid-category"
)
assert len(entity.observations) == 3
categories = [obs.category for obs in entity.observations]
# Invalid categories should be converted to default
assert "random category" in categories
# Valid categories preserved
assert "a space category" in categories
assert "design" in categories
@pytest.mark.skip("sometimes fails")
@pytest.mark.asyncio
async def test_sync_entity_with_order_dependent_relations(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that order of entity syncing doesn't affect relation creation."""
project_dir = project_config.home
# Create several interrelated entities
entities = {
"a": """
---
type: knowledge
permalink: concept/entity-a
created: 2024-01-01
modified: 2024-01-01
---
# Entity A
## Observations
- depends on b
- depends on c
## Relations
- depends_on [[concept/entity-b]]
- depends_on [[concept/entity-c]]
""",
"b": """
---
type: knowledge
permalink: concept/entity-b
created: 2024-01-01
modified: 2024-01-01
---
# Entity B
## Observations
- depends on c
## Relations
- depends_on [[concept/entity-c]]
""",
"c": """
---
type: knowledge
permalink: concept/entity-c
created: 2024-01-01
modified: 2024-01-01
---
# Entity C
## Observations
- depends on a
## Relations
- depends_on [[concept/entity-a]]
""",
}
# Create files in different orders and verify results are the same
for name, content in entities.items():
await create_test_file(project_dir / f"concept/entity_{name}.md", content)
# Sync
await sync_service.sync(project_config.home)
# Verify all relations are created correctly regardless of order
entity_a = await sync_service.entity_service.repository.get_by_permalink("concept/entity-a")
entity_b = await sync_service.entity_service.repository.get_by_permalink("concept/entity-b")
entity_c = await sync_service.entity_service.repository.get_by_permalink("concept/entity-c")
# Verify outgoing relations by checking actual targets
a_outgoing_targets = {rel.to_id for rel in entity_a.outgoing_relations}
assert entity_b.id in a_outgoing_targets, (
f"A should depend on B. A's targets: {a_outgoing_targets}, B's ID: {entity_b.id}"
)
assert entity_c.id in a_outgoing_targets, (
f"A should depend on C. A's targets: {a_outgoing_targets}, C's ID: {entity_c.id}"
)
assert len(entity_a.outgoing_relations) == 2, "A should have exactly 2 outgoing relations"
b_outgoing_targets = {rel.to_id for rel in entity_b.outgoing_relations}
assert entity_c.id in b_outgoing_targets, "B should depend on C"
assert len(entity_b.outgoing_relations) == 1, "B should have exactly 1 outgoing relation"
c_outgoing_targets = {rel.to_id for rel in entity_c.outgoing_relations}
assert entity_a.id in c_outgoing_targets, "C should depend on A"
assert len(entity_c.outgoing_relations) == 1, "C should have exactly 1 outgoing relation"
# Verify incoming relations by checking actual sources
a_incoming_sources = {rel.from_id for rel in entity_a.incoming_relations}
assert entity_c.id in a_incoming_sources, "A should have incoming relation from C"
b_incoming_sources = {rel.from_id for rel in entity_b.incoming_relations}
assert entity_a.id in b_incoming_sources, "B should have incoming relation from A"
c_incoming_sources = {rel.from_id for rel in entity_c.incoming_relations}
assert entity_a.id in c_incoming_sources, "C should have incoming relation from A"
assert entity_b.id in c_incoming_sources, "C should have incoming relation from B"
@pytest.mark.asyncio
async def test_sync_empty_directories(sync_service: SyncService, project_config: ProjectConfig):
"""Test syncing empty directories."""
await sync_service.sync(project_config.home)
# Should not raise exceptions for empty dirs
assert project_config.home.exists()
@pytest.mark.skip("flaky on Windows due to filesystem timing precision")
@pytest.mark.asyncio
async def test_sync_file_modified_during_sync(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test handling of files that change during sync process."""
# Create initial files
doc_path = project_config.home / "changing.md"
await create_test_file(
doc_path,
"""
---
type: knowledge
id: changing
created: 2024-01-01
modified: 2024-01-01
---
# Knowledge File
## Observations
- This is a test
""",
)
# Setup async modification during sync
async def modify_file():
await asyncio.sleep(0.1) # Small delay to ensure sync has started
doc_path.write_text("Modified during sync")
# Run sync and modification concurrently
await asyncio.gather(sync_service.sync(project_config.home), modify_file())
# Verify final state
doc = await sync_service.entity_service.repository.get_by_permalink("changing")
assert doc is not None
# if we failed in the middle of a sync, the next one should fix it.
if doc.checksum is None:
await sync_service.sync(project_config.home)
doc = await sync_service.entity_service.repository.get_by_permalink("changing")
assert doc.checksum is not None
@pytest.mark.asyncio
async def test_permalink_formatting(
sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
"""Test that permalinks are properly formatted during sync."""
project_prefix = generate_permalink(project_config.name)
# Test cases with different filename formats
test_files = {
# filename -> expected permalink
"my_awesome_feature.md": "my-awesome-feature",
"MIXED_CASE_NAME.md": "mixed-case-name",
"spaces and_underscores.md": "spaces-and-underscores",
"design/model_refactor.md": "design/model-refactor",
"test/multiple_word_directory/feature_name.md": "test/multiple-word-directory/feature-name",
}
# Create test files
content: str = """
---
type: knowledge
created: 2024-01-01
modified: 2024-01-01
---
# Test File
Testing permalink generation.
"""
for filename, _ in test_files.items():
await create_test_file(project_config.home / filename, content)
# Run sync once after all files are created
await sync_service.sync(project_config.home)
# Verify permalinks
entities = await entity_service.repository.find_all()
for filename, expected_permalink in test_files.items():
# Find entity for this file
entity = next(e for e in entities if e.file_path == filename)
expected_full_permalink = f"{project_prefix}/{expected_permalink}"
assert entity.permalink == expected_full_permalink, (
f"File {filename} should have permalink {expected_full_permalink}"
)
@pytest.mark.asyncio
async def test_handle_entity_deletion(
test_graph,
sync_service: SyncService,
entity_repository: EntityRepository,
search_service: SearchService,
):
"""Test deletion of entity cleans up search index."""
root_entity = test_graph["root"]
# Delete the entity
await sync_service.handle_delete(root_entity.file_path)
# Verify entity is gone from db
assert await entity_repository.get_by_permalink(root_entity.permalink) is None
# Verify entity is gone from search index
entity_results = await search_service.search(SearchQuery(text=root_entity.title))
assert len(entity_results) == 0
obs_results = await search_service.search(SearchQuery(text="Root note 1"))
assert len(obs_results) == 0
# Verify relations from root entity are gone
# (Postgres stemming would match "connects_to" with "connected_to", so use permalink)
rel_results = await search_service.search(SearchQuery(permalink=root_entity.permalink))
assert len(rel_results) == 0
@pytest.mark.asyncio
async def test_sync_preserves_timestamps(
sync_service: SyncService,
project_config: ProjectConfig,
entity_service: EntityService,
):
"""Test that sync preserves file timestamps and frontmatter dates."""
project_dir = project_config.home
# Create a file with explicit frontmatter dates
frontmatter_content = """
---
type: knowledge
---
# Explicit Dates
Testing frontmatter dates
"""
await create_test_file(project_dir / "explicit_dates.md", frontmatter_content)
# Create a file without dates (will use file timestamps)
file_dates_content = """
---
type: knowledge
---
# File Dates
Testing file timestamps
"""
file_path = project_dir / "file_dates3.md"
await create_test_file(file_path, file_dates_content)
# Run sync
await sync_service.sync(project_config.home)
# Check explicit frontmatter dates
project_prefix = generate_permalink(project_config.name)
explicit_entity = await entity_service.get_by_permalink(f"{project_prefix}/explicit-dates")
assert explicit_entity.created_at is not None
assert explicit_entity.updated_at is not None
# Check file timestamps
file_entity = await entity_service.get_by_permalink(f"{project_prefix}/file-dates3")
file_stats = file_path.stat()
# Compare using epoch timestamps to handle timezone differences correctly
# This ensures we're comparing the actual points in time, not display representations
entity_created_epoch = file_entity.created_at.timestamp()
entity_updated_epoch = file_entity.updated_at.timestamp()
# Allow 2s difference due to filesystem timing precision and database processing delays
# Windows has coarser filesystem timestamps, but Postgres can also have slight timing differences
tolerance = 2
assert abs(entity_created_epoch - file_stats.st_ctime) < tolerance
assert abs(entity_updated_epoch - file_stats.st_mtime) < tolerance
@pytest.mark.asyncio
async def test_sync_updates_timestamps_on_file_modification(
sync_service: SyncService,
project_config: ProjectConfig,
entity_service: EntityService,
):
"""Test that sync updates entity timestamps when files are modified.
This test specifically validates that when an existing file is modified and re-synced,
the updated_at timestamp in the database reflects the file's actual modification time,
not the database operation time. This is critical for accurate temporal ordering in
search and recent_activity queries.
"""
project_dir = project_config.home
# Create initial file
initial_content = """
---
type: knowledge
---
# Test File
Initial content for timestamp test
"""
file_path = project_dir / "timestamp_test.md"
await create_test_file(file_path, initial_content)
# Initial sync
await sync_service.sync(project_config.home)
# Get initial entity and timestamps
project_prefix = generate_permalink(project_config.name)
entity_before = await entity_service.get_by_permalink(f"{project_prefix}/timestamp-test")
initial_updated_at = entity_before.updated_at
# Modify the file content and update mtime to be newer than watermark
modified_content = """
---
type: knowledge
---
# Test File
Modified content for timestamp test
## Observations
- [test] This was modified
"""
file_path.write_text(modified_content)
# Touch file to ensure mtime is newer than watermark
# This uses our helper which sleeps 500ms and rewrites to guarantee mtime change
await touch_file(file_path)
# Get the file's modification time after our changes
file_stats_after_modification = file_path.stat()
# Force full scan to ensure the modified file is detected
# (incremental scans have timing precision issues with watermarks on some filesystems)
await force_full_scan(sync_service)
# Re-sync the modified file
await sync_service.sync(project_config.home)
# Get entity after re-sync
entity_after = await entity_service.get_by_permalink(f"{project_prefix}/timestamp-test")
# Verify that updated_at changed
assert entity_after.updated_at != initial_updated_at, (
"updated_at should change when file is modified"
)
# Verify that updated_at matches the file's modification time, not db operation time
entity_updated_epoch = entity_after.updated_at.timestamp()
file_mtime = file_stats_after_modification.st_mtime
# Allow 2s difference due to filesystem timing precision and sync processing delays
tolerance = 2
assert abs(entity_updated_epoch - file_mtime) < tolerance, (
f"Entity updated_at ({entity_after.updated_at}) should match file mtime "
f"({datetime.fromtimestamp(file_mtime)}) within {tolerance}s tolerance"
)
# Verify the content was actually updated
assert len(entity_after.observations) == 1
assert entity_after.observations[0].content == "This was modified"
@pytest.mark.asyncio
async def test_file_move_updates_search_index(
sync_service: SyncService,
project_config: ProjectConfig,
search_service: SearchService,
):
"""Test that moving a file updates its path in the search index."""
project_dir = project_config.home
# Create initial file
content = """
---
type: knowledge
---
# Test Move
Content for move test
"""
old_path = project_dir / "old" / "test_move.md"
old_path.parent.mkdir(parents=True)
await create_test_file(old_path, content)
# Initial sync
await sync_service.sync(project_config.home)
# Move the file
new_path = project_dir / "new" / "moved_file.md"
new_path.parent.mkdir(parents=True)
old_path.rename(new_path)
# Force full scan to detect the move
# (rename doesn't update mtime, so incremental scan won't find it)
await force_full_scan(sync_service)
# Second sync should detect the move
await sync_service.sync(project_config.home)
# Check search index has updated path
results = await search_service.search(SearchQuery(text="Content for move test"))
assert len(results) == 1
assert results[0].file_path == new_path.relative_to(project_dir).as_posix()
@pytest.mark.asyncio
async def test_sync_null_checksum_cleanup(
sync_service: SyncService,
project_config: ProjectConfig,
entity_service: EntityService,
):
"""Test handling of entities with null checksums from incomplete syncs."""
# Create entity with null checksum (simulating incomplete sync)
entity = Entity(
permalink="concept/incomplete",
title="Incomplete",
note_type="test",
file_path="concept/incomplete.md",
checksum=None, # Null checksum
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
await entity_service.repository.add(entity)
# Create corresponding file
content = """
---
type: knowledge
id: concept/incomplete
created: 2024-01-01
modified: 2024-01-01
---
# Incomplete Entity
## Observations
- Testing cleanup
"""
await create_test_file(project_config.home / "concept/incomplete.md", content)
# Run sync
await sync_service.sync(project_config.home)
# Verify entity was properly synced
updated = await entity_service.get_by_permalink("concept/incomplete")
assert updated.checksum is not None
@pytest.mark.asyncio
async def test_sync_permalink_resolved(
sync_service: SyncService, project_config: ProjectConfig, file_service: FileService, app_config
):
"""Test that we resolve duplicate permalinks on sync ."""
project_dir = project_config.home
project_prefix = generate_permalink(project_config.name)
# Create initial file
content = """
---
type: knowledge
---
# Test Move
Content for move test
"""
old_path = project_dir / "old" / "test_move.md"
old_path.parent.mkdir(parents=True)
await create_test_file(old_path, content)
# Initial sync
await sync_service.sync(project_config.home)
# Move the file
new_path = project_dir / "new" / "moved_file.md"
new_path.parent.mkdir(parents=True)
old_path.rename(new_path)
# Force full scan to detect the move
# (rename doesn't update mtime, so incremental scan won't find it)
await force_full_scan(sync_service)
# Sync again
await sync_service.sync(project_config.home)
file_content, _ = await file_service.read_file(new_path)
assert f"permalink: {project_prefix}/new/moved-file" in file_content
# Create another that has the same permalink
content = f"""
---
type: knowledge
permalink: {project_prefix}/new/moved-file
---
# Test Move
Content for move test
"""
old_path = project_dir / "old" / "test_move.md"
old_path.parent.mkdir(parents=True, exist_ok=True)
await create_test_file(old_path, content)
# Force full scan to detect the new file
# (file just created may not be newer than watermark due to timing precision)
await force_full_scan(sync_service)
# Sync new file
await sync_service.sync(project_config.home)
# assert permalink is unique
file_content, _ = await file_service.read_file(old_path)
assert f"permalink: {project_prefix}/new/moved-file-1" in file_content
@pytest.mark.asyncio
async def test_sync_permalink_resolved_on_update(
sync_service: SyncService,
project_config: ProjectConfig,
file_service: FileService,
):
"""Test that sync resolves permalink conflicts on update."""
project_dir = project_config.home
one_file = project_dir / "one.md"
two_file = project_dir / "two.md"
await create_test_file(
one_file,
content=dedent(
"""
---
permalink: one
---
test content
"""
),
)
await create_test_file(
two_file,
content=dedent(
"""
---
permalink: two
---
test content
"""
),
)
# Run sync
await sync_service.sync(project_config.home)
# Check permalinks
file_one_content, _ = await file_service.read_file(one_file)
assert "permalink: one" in file_one_content
file_two_content, _ = await file_service.read_file(two_file)
assert "permalink: two" in file_two_content
# update the second file with a duplicate permalink
updated_content = """
---
title: two.md
type: note
permalink: one
tags: []
---
test content
"""
two_file.write_text(updated_content)
# Force full scan to detect the modified file
# (file just modified may not be newer than watermark due to timing precision)
await force_full_scan(sync_service)
# Run sync
await sync_service.sync(project_config.home)
# Check permalinks
file_two_content, _ = await file_service.read_file(two_file)
assert "permalink: two" in file_two_content
# new content with duplicate permalink
new_content = """
---
title: new.md
type: note
permalink: one
tags: []
---
test content
"""
new_file = project_dir / "new.md"
await create_test_file(new_file, new_content)
# Force full scan to detect the new file
# (file just created may not be newer than watermark due to timing precision)
await force_full_scan(sync_service)
# Run another time
await sync_service.sync(project_config.home)
# Should have deduplicated permalink
new_file_content, _ = await file_service.read_file(new_file)
assert "permalink: one-1" in new_file_content
@pytest.mark.asyncio
async def test_sync_permalink_not_created_if_no_frontmatter(
sync_service: SyncService,
project_config: ProjectConfig,
file_service: FileService,
app_config: BasicMemoryConfig,
):
"""Test that sync does not add frontmatter when ensure_frontmatter_on_sync is disabled."""
app_config.ensure_frontmatter_on_sync = False
project_dir = project_config.home
file = project_dir / "one.md"
await create_test_file(file)
# Run sync
await sync_service.sync(project_config.home)
# Check permalink not created
file_content, _ = await file_service.read_file(file)
assert "permalink:" not in file_content
@pytest.mark.asyncio
async def test_sync_frontmatter_created_if_missing_when_enabled(
sync_service: SyncService,
project_config: ProjectConfig,
file_service: FileService,
app_config: BasicMemoryConfig,
):
"""Sync should add derived frontmatter when configured for missing-frontmatter files."""
app_config.ensure_frontmatter_on_sync = True
project_dir = project_config.home
file = project_dir / "one.md"
await create_test_file(file, "# One\n")
await sync_service.sync(project_config.home)
file_content, _ = await file_service.read_file(file)
project_prefix = generate_permalink(project_config.name)
assert "title: one" in file_content
assert "type: note" in file_content
assert f"permalink: {project_prefix}/one" in file_content
entity = await sync_service.entity_repository.get_by_file_path("one.md")
assert entity is not None
assert entity.permalink == f"{project_prefix}/one"
@pytest.mark.asyncio
async def test_sync_frontmatter_created_if_missing_overrides_disable_permalinks(
sync_service: SyncService,
project_config: ProjectConfig,
file_service: FileService,
app_config: BasicMemoryConfig,
):
"""Missing-frontmatter sync path should write permalink even when disable_permalinks is true."""
app_config.ensure_frontmatter_on_sync = True
app_config.disable_permalinks = True
project_dir = project_config.home
file = project_dir / "override.md"
await create_test_file(file, "# Override\n")
await sync_service.sync(project_config.home)
file_content, _ = await file_service.read_file(file)
project_prefix = generate_permalink(project_config.name)
assert f"permalink: {project_prefix}/override" in file_content
entity = await sync_service.entity_repository.get_by_file_path("override.md")
assert entity is not None
assert entity.permalink == f"{project_prefix}/override"
@pytest.fixture
def test_config_update_permamlinks_on_move(app_config) -> BasicMemoryConfig:
"""Test configuration using in-memory DB."""
app_config.update_permalinks_on_move = True
return app_config
@pytest.mark.asyncio
async def test_sync_permalink_updated_on_move(
test_config_update_permamlinks_on_move: BasicMemoryConfig,
project_config: ProjectConfig,
sync_service: SyncService,
file_service: FileService,
):
"""Test that we update a permalink on a file move if set in config ."""
project_dir = project_config.home
project_prefix = generate_permalink(project_config.name)
# Create initial file
content = dedent(
"""
---
type: knowledge
---
# Test Move
Content for move test
"""
)
old_path = project_dir / "old" / "test_move.md"
old_path.parent.mkdir(parents=True)
await create_test_file(old_path, content)
# Initial sync
await sync_service.sync(project_config.home)
# verify permalink
old_content, _ = await file_service.read_file(old_path)
assert f"permalink: {project_prefix}/old/test-move" in old_content
# Move the file
new_path = project_dir / "new" / "moved_file.md"
new_path.parent.mkdir(parents=True)
old_path.rename(new_path)
# Force full scan to detect the move
# (rename doesn't update mtime, so incremental scan won't find it)
await force_full_scan(sync_service)
# Sync again
await sync_service.sync(project_config.home)
file_content, _ = await file_service.read_file(new_path)
assert f"permalink: {project_prefix}/new/moved-file" in file_content
@pytest.mark.asyncio
async def test_sync_non_markdown_files(sync_service, project_config, test_files):
"""Test syncing non-markdown files."""
report = await sync_service.sync(project_config.home)
assert report.total == 2
# Check files were detected
assert test_files["pdf"].name in [f for f in report.new]
assert test_files["image"].name in [f for f in report.new]
# Verify entities were created
pdf_entity = await sync_service.entity_repository.get_by_file_path(str(test_files["pdf"].name))
assert pdf_entity is not None, "PDF entity should have been created"
assert pdf_entity.content_type == "application/pdf"
image_entity = await sync_service.entity_repository.get_by_file_path(
str(test_files["image"].name)
)
assert image_entity.content_type == "image/png"
@pytest.mark.asyncio
async def test_sync_non_markdown_files_modified(
sync_service, project_config, test_files, file_service
):
"""Test syncing non-markdown files."""
report = await sync_service.sync(project_config.home)
assert report.total == 2
# Check files were detected
assert test_files["pdf"].name in [f for f in report.new]
assert test_files["image"].name in [f for f in report.new]
test_files["pdf"].write_text("New content")
test_files["image"].write_text("New content")
# Force full scan to detect the modified files
# (files just modified may not be newer than watermark due to timing precision)
await force_full_scan(sync_service)
report = await sync_service.sync(project_config.home)
assert len(report.modified) == 2
pdf_file_content, pdf_checksum = await file_service.read_file(test_files["pdf"].name)
image_file_content, img_checksum = await file_service.read_file(test_files["image"].name)
pdf_entity = await sync_service.entity_repository.get_by_file_path(str(test_files["pdf"].name))
image_entity = await sync_service.entity_repository.get_by_file_path(
str(test_files["image"].name)
)
assert pdf_entity.checksum == pdf_checksum
assert image_entity.checksum == img_checksum
@pytest.mark.asyncio
async def test_sync_non_markdown_files_move(sync_service, project_config, test_files):
"""Test syncing non-markdown files updates permalink"""
report = await sync_service.sync(project_config.home)
assert report.total == 2
# Check files were detected
assert test_files["pdf"].name in [f for f in report.new]
assert test_files["image"].name in [f for f in report.new]
test_files["pdf"].rename(project_config.home / "moved_pdf.pdf")
# Force full scan to detect the move
# (rename doesn't update mtime, so incremental scan won't find it)
await force_full_scan(sync_service)
report2 = await sync_service.sync(project_config.home)
assert len(report2.moves) == 1
# Verify entity is updated
pdf_entity = await sync_service.entity_repository.get_by_file_path("moved_pdf.pdf")
assert pdf_entity is not None
assert pdf_entity.permalink is None
@pytest.mark.asyncio
async def test_sync_non_markdown_files_deleted(sync_service, project_config, test_files):
"""Test syncing non-markdown files updates permalink"""
report = await sync_service.sync(project_config.home)
assert report.total == 2
# Check files were detected
assert test_files["pdf"].name in [f for f in report.new]
assert test_files["image"].name in [f for f in report.new]
test_files["pdf"].unlink()
report2 = await sync_service.sync(project_config.home)
assert len(report2.deleted) == 1
# Verify entity is deleted
pdf_entity = await sync_service.entity_repository.get_by_file_path("moved_pdf.pdf")
assert pdf_entity is None
@pytest.mark.asyncio
async def test_sync_non_markdown_files_move_with_delete(
sync_service, project_config, test_files, file_service
):
"""Test syncing non-markdown files handles file deletes and renames during sync"""
# Create initial files
await create_test_file(project_config.home / "doc.pdf", "content1")
await create_test_file(project_config.home / "other/doc-1.pdf", "content2")
# Initial sync
await sync_service.sync(project_config.home)
# First move/delete the original file to make way for the move
(project_config.home / "doc.pdf").unlink()
(project_config.home / "other/doc-1.pdf").rename(project_config.home / "doc.pdf")
# Sync again
await sync_service.sync(project_config.home)
# Verify the changes
moved_entity = await sync_service.entity_repository.get_by_file_path("doc.pdf")
assert moved_entity is not None
assert moved_entity.permalink is None
file_content, _ = await file_service.read_file("doc.pdf")
assert "content2" in file_content
@pytest.mark.asyncio
async def test_sync_relation_to_non_markdown_file(
sync_service: SyncService, project_config: ProjectConfig, file_service: FileService, test_files
):
"""Test that sync resolves permalink conflicts on update."""
project_dir = project_config.home
project_prefix = generate_permalink(project_config.name)
content = f"""
---
title: a note
type: note
tags: []
---
- relates_to [[{test_files["pdf"].name}]]
"""
note_file = project_dir / "note.md"
await create_test_file(note_file, content)
# Run sync
await sync_service.sync(project_config.home)
# Check permalinks
file_one_content, _ = await file_service.read_file(note_file)
assert (
f"""---
title: a note
type: note
tags: []
permalink: {project_prefix}/note
---
- relates_to [[{test_files["pdf"].name}]]
""".strip()
== file_one_content
)
@pytest.mark.asyncio
async def test_sync_regular_file_race_condition_handling(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that sync_regular_file handles race condition with IntegrityError (lines 380-401)."""
from datetime import datetime, timezone
# Create a test file
test_file = project_config.home / "test_race.md"
test_content = """
---
type: knowledge
---
# Test Race Condition
This is a test file for race condition handling.
"""
await create_test_file(test_file, test_content)
rel_path = test_file.relative_to(project_config.home).as_posix()
# Create an existing entity with the same file_path to force a real DB IntegrityError
# on the "add" call (same effect as the race-condition branch).
await sync_service.entity_repository.add(
Entity(
note_type="file",
file_path=rel_path,
checksum="old_checksum",
title="Test Race Condition",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
content_type="text/markdown",
mtime=None,
size=None,
)
)
# Call sync_regular_file (new=True) - should fall back to update path
entity, checksum = await sync_service.sync_regular_file(rel_path, new=True)
assert entity is not None
assert entity.file_path == rel_path
assert entity.checksum == checksum
@pytest.mark.asyncio
async def test_circuit_breaker_should_skip_after_three_recorded_failures(
sync_service: SyncService, project_config: ProjectConfig
):
"""Circuit breaker: after 3 recorded failures, unchanged file should be skipped."""
project_dir = project_config.home
test_file = project_dir / "failing_file.md"
await create_test_file(test_file, "---\ntype: note\n---\ncontent\n")
rel_path = test_file.relative_to(project_dir).as_posix()
await sync_service._record_failure(rel_path, "failure 1")
await sync_service._record_failure(rel_path, "failure 2")
await sync_service._record_failure(rel_path, "failure 3")
assert await sync_service._should_skip_file(rel_path) is True
assert rel_path in sync_service._file_failures
assert sync_service._file_failures[rel_path].count == 3
@pytest.mark.asyncio
async def test_circuit_breaker_resets_when_checksum_changes(
sync_service: SyncService, project_config: ProjectConfig
):
"""Circuit breaker: if file checksum changes, it should be retried (not skipped)."""
project_dir = project_config.home
test_file = project_dir / "changing_file.md"
await create_test_file(test_file, "---\ntype: note\n---\ncontent\n")
rel_path = test_file.relative_to(project_dir).as_posix()
await sync_service._record_failure(rel_path, "failure 1")
await sync_service._record_failure(rel_path, "failure 2")
await sync_service._record_failure(rel_path, "failure 3")
assert await sync_service._should_skip_file(rel_path) is True
# Change content → checksum changes → _should_skip_file should reset and allow retry
test_file.write_text("---\ntype: note\n---\nchanged content\n")
assert await sync_service._should_skip_file(rel_path) is False
assert rel_path not in sync_service._file_failures
@pytest.mark.asyncio
async def test_record_failure_uses_empty_checksum_when_checksum_computation_fails(
sync_service: SyncService,
):
"""_record_failure() should not crash if checksum computation fails."""
missing_path = "does-not-exist.md"
await sync_service._record_failure(missing_path, "boom")
assert missing_path in sync_service._file_failures
assert sync_service._file_failures[missing_path].last_checksum == ""
@pytest.mark.asyncio
async def test_sync_fatal_error_terminates_sync_immediately(
sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
"""Test that SyncFatalError terminates sync immediately without circuit breaker retry.
This tests the fix for issue #188 where project deletion during sync should
terminate immediately rather than retrying each file 3 times.
"""
pytest.skip(
"SyncFatalError behavior is excluded from coverage and not reliably reproducible "
"without patching (depends on project deletion during sync)."
)
@pytest.mark.asyncio
async def test_scan_directory_basic(sync_service: SyncService, project_config: ProjectConfig):
"""Test basic streaming directory scan functionality."""
project_dir = project_config.home
# Create test files in different directories
await create_test_file(project_dir / "root.md", "root content")
await create_test_file(project_dir / "subdir/file1.md", "file 1 content")
await create_test_file(project_dir / "subdir/file2.md", "file 2 content")
await create_test_file(project_dir / "subdir/nested/file3.md", "file 3 content")
# Collect results from streaming iterator
results = []
async for file_path, stat_info in sync_service.scan_directory(project_dir):
rel_path = Path(file_path).relative_to(project_dir).as_posix()
results.append((rel_path, stat_info))
# Verify all files were found
file_paths = {rel_path for rel_path, _ in results}
assert "root.md" in file_paths
assert "subdir/file1.md" in file_paths
assert "subdir/file2.md" in file_paths
assert "subdir/nested/file3.md" in file_paths
assert len(file_paths) == 4
# Verify stat info is present for each file
for rel_path, stat_info in results:
assert stat_info is not None
assert stat_info.st_size > 0 # Files have content
assert stat_info.st_mtime > 0 # Have modification time
@pytest.mark.asyncio
async def test_scan_directory_respects_ignore_patterns(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that streaming scan respects .gitignore patterns."""
project_dir = project_config.home
# Create .gitignore file in project (will be used along with .bmignore)
(project_dir / ".gitignore").write_text("*.ignored\n.hidden/\n")
# Reload ignore patterns using project's .gitignore
from basic_memory.ignore_utils import load_gitignore_patterns
sync_service._ignore_patterns = load_gitignore_patterns(project_dir)
# Create test files - some should be ignored
await create_test_file(project_dir / "included.md", "included")
await create_test_file(project_dir / "excluded.ignored", "excluded")
await create_test_file(project_dir / ".hidden/secret.md", "secret")
await create_test_file(project_dir / "subdir/file.md", "file")
# Collect results
results = []
async for file_path, stat_info in sync_service.scan_directory(project_dir):
rel_path = Path(file_path).relative_to(project_dir).as_posix()
results.append(rel_path)
# Verify ignored files were not returned
assert "included.md" in results
assert "subdir/file.md" in results
assert "excluded.ignored" not in results
assert ".hidden/secret.md" not in results
assert ".bmignore" not in results # .bmignore itself should be ignored
@pytest.mark.asyncio
async def test_scan_directory_cached_stat_info(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that streaming scan provides cached stat info (no redundant stat calls)."""
project_dir = project_config.home
# Create test file
test_file = project_dir / "test.md"
await create_test_file(test_file, "test content")
# Get stat info from streaming scan
async for file_path, stat_info in sync_service.scan_directory(project_dir):
if Path(file_path).name == "test.md":
# Get independent stat for comparison
independent_stat = test_file.stat()
# Verify stat info matches (cached stat should be accurate)
assert stat_info.st_size == independent_stat.st_size
assert abs(stat_info.st_mtime - independent_stat.st_mtime) < 1 # Allow 1s tolerance
assert abs(stat_info.st_ctime - independent_stat.st_ctime) < 1
break
@pytest.mark.asyncio
async def test_scan_directory_empty_directory(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test streaming scan on empty directory (ignoring hidden files)."""
project_dir = project_config.home
# Directory exists but has no user files (may have .basic-memory config dir)
assert project_dir.exists()
# Don't create any user files - just scan empty directory
# Scan should yield no results (hidden files are ignored by default)
results = []
async for file_path, stat_info in sync_service.scan_directory(project_dir):
results.append(file_path)
# Should find no files (config dirs are hidden and ignored)
assert len(results) == 0
@pytest.mark.asyncio
async def test_scan_directory_handles_permission_error(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that streaming scan handles permission errors gracefully."""
import sys
# Skip on Windows - permission handling is different
if sys.platform == "win32":
pytest.skip("Permission tests not reliable on Windows")
project_dir = project_config.home
# Create accessible file
await create_test_file(project_dir / "accessible.md", "accessible")
# Create restricted directory
restricted_dir = project_dir / "restricted"
restricted_dir.mkdir()
await create_test_file(restricted_dir / "secret.md", "secret")
# Remove read permission from restricted directory
restricted_dir.chmod(0o000)
try:
# Scan should handle permission error and continue
results = []
async for file_path, stat_info in sync_service.scan_directory(project_dir):
rel_path = Path(file_path).relative_to(project_dir).as_posix()
results.append(rel_path)
# Should have found accessible file but not restricted one
assert "accessible.md" in results
assert "restricted/secret.md" not in results
finally:
# Restore permissions for cleanup
restricted_dir.chmod(0o755)
@pytest.mark.asyncio
async def test_scan_directory_non_markdown_files(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that streaming scan finds all file types, not just markdown."""
project_dir = project_config.home
# Create various file types
await create_test_file(project_dir / "doc.md", "markdown")
(project_dir / "image.png").write_bytes(b"PNG content")
(project_dir / "data.json").write_text('{"key": "value"}')
(project_dir / "script.py").write_text("print('hello')")
# Collect results
results = []
async for file_path, stat_info in sync_service.scan_directory(project_dir):
rel_path = Path(file_path).relative_to(project_dir).as_posix()
results.append(rel_path)
# All files should be found
assert "doc.md" in results
assert "image.png" in results
assert "data.json" in results
assert "script.py" in results
@pytest.mark.asyncio
async def test_file_service_checksum_correctness(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that FileService computes correct checksums."""
import hashlib
project_dir = project_config.home
# Test small markdown file
small_content = "Test content for checksum validation" * 10
small_file = project_dir / "small.md"
await create_test_file(small_file, small_content)
rel_path = small_file.relative_to(project_dir).as_posix()
checksum = await sync_service.file_service.compute_checksum(rel_path)
# Verify checksum is correct
expected = hashlib.sha256(small_content.encode("utf-8")).hexdigest()
assert checksum == expected
assert len(checksum) == 64 # SHA256 hex digest length
@pytest.mark.asyncio
async def test_sync_handles_file_not_found_gracefully(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that FileNotFoundError during sync is handled gracefully.
This tests the fix for issue #386 where files existing in the database
but missing from the filesystem would crash the sync worker.
"""
project_dir = project_config.home
# Create a test file
test_file = project_dir / "missing_file.md"
await create_test_file(
test_file,
dedent(
"""
---
type: knowledge
permalink: missing-file
---
# Missing File
Content that will disappear
"""
),
)
# Sync to add entity to database
await sync_service.sync(project_dir)
# Verify entity was created
entity = await sync_service.entity_repository.get_by_file_path("missing_file.md")
assert entity is not None
assert entity.permalink == "missing-file"
# Delete the file but leave the entity in database (simulating inconsistency)
test_file.unlink()
# Sync the missing file directly: sync_markdown_file will raise FileNotFoundError naturally,
# and sync_file() should treat it as deletion.
await sync_service.sync_file("missing_file.md", new=False)
# Entity should be deleted from database
entity = await sync_service.entity_repository.get_by_file_path("missing_file.md")
assert entity is None, "Orphaned entity should be deleted when file is not found"
@pytest.mark.asyncio
async def test_sync_file_continues_on_semantic_dependency_error(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that sync_file returns the entity even when vector embedding fails.
When sqlite-vec or another semantic dependency is missing, FTS indexing
still succeeds. The entity should be returned successfully with a warning,
not treated as a file-level failure.
"""
from unittest.mock import AsyncMock
from basic_memory.repository.semantic_errors import SemanticDependenciesMissingError
project_dir = project_config.home
content = """---
type: note
---
# Semantic Error Test
## Observations
- [test] This entity should still be synced despite embedding failure
"""
await create_test_file(project_dir / "semantic_test.md", content)
await sync_service.sync(project_dir)
# Patch index_entity to raise SemanticDependenciesMissingError
original_index = sync_service.search_service.index_entity
call_count = 0
async def index_with_semantic_error(entity, **kwargs):
nonlocal call_count
call_count += 1
raise SemanticDependenciesMissingError("sqlite-vec package is missing")
sync_service.search_service.index_entity = AsyncMock(side_effect=index_with_semantic_error)
try:
# Modify the file so it gets re-synced
await create_test_file(
project_dir / "semantic_test.md",
content.replace("should still be synced", "updated content"),
)
entity, checksum = await sync_service.sync_file("semantic_test.md", new=False)
# Entity should be returned successfully despite semantic error
assert entity is not None, "Entity should be returned even when embedding fails"
assert checksum is not None
# Verify circuit breaker was NOT triggered (failure not recorded)
assert "semantic_test.md" not in sync_service._file_failures
finally:
sync_service.search_service.index_entity = original_index