# tests/unit/test_kb_manager.py
# container-mcp © 2025 by Martin Bukowski is licensed under Apache 2.0
"""Unit tests for Knowledge Base Manager."""
import pytest
import asyncio
from unittest.mock import MagicMock, AsyncMock, patch, ANY
from datetime import datetime, timezone
import cmcp.kb.search # Import the cmcp module
from cmcp.managers import KnowledgeBaseManager
from cmcp.kb.models import DocumentIndex, ImplicitRDFTriple, DocumentFragment
from cmcp.kb.path import PathComponents, PartialPathComponents
from cmcp.kb.search import (
SearchService,
SearchIndexManager,
SearchIndexRecovery,
SparseSearchIndex,
GraphSearchIndex,
Reranker,
)
from cmcp.kb.document_store import DocumentStore
# Mock dependencies - need to import after mocking is in place
# from cmcp.kb.document_store import DocumentStore
# from cmcp.kb.search import SparseSearchIndex, GraphSearchIndex, Reranker
# Mock dependencies
@pytest.fixture
def mock_doc_store():
"""Mock for the DocumentStore."""
return MagicMock(spec=DocumentStore)
@pytest.fixture
def mock_search_manager():
"""Mock for the SearchIndexManager."""
return MagicMock(spec=SearchIndexManager)
@pytest.fixture
def mock_search_recovery():
"""Mock for the SearchIndexRecovery."""
return MagicMock(spec=SearchIndexRecovery)
@pytest.fixture
def mock_search_service():
"""Async-aware mock for the SearchService."""
service = AsyncMock(spec=SearchService)
# Configure the search method to return a future-like object that can be awaited
service.search.return_value = asyncio.Future()
service.search.return_value.set_result([])
return service
@pytest.fixture
def test_config_search_enabled(test_config):
"""Fixture for test config with search enabled."""
config = test_config.model_copy(deep=True)
config.kb_config.search_enabled = True
config.kb_config.sparse_index_path = "/tmp/sparse"
config.kb_config.graph_index_path = "/tmp/graph"
return config
@pytest.fixture
def test_config_search_disabled(test_config):
"""Fixture for test config with search disabled."""
config = test_config.model_copy(deep=True)
config.kb_config.search_enabled = False
return config
@pytest.fixture
async def kb_manager(tmpdir):
"""Fixture for a fully initialized KnowledgeBaseManager with mocked dependencies."""
kb_path = str(tmpdir.mkdir("kb"))
# We patch the classes within the module where they are *used*.
with patch('cmcp.managers.knowledge_base_manager.DocumentStore', new_callable=MagicMock) as MockDocStore:
# Manually instantiate the manager, bypassing its real initialize() method in tests
manager = KnowledgeBaseManager(
storage_path=kb_path,
timeout_default=30,
timeout_max=300,
search_enabled=True,
sparse_index_path="/test/sparse",
graph_index_path="/test/graph",
reranker_model="test-model"
)
# Manually assign mocked dependencies.
# This gives tests full control over the manager's collaborators.
manager.document_store = MockDocStore()
# Use AsyncMock with a spec to ensure it has the right async methods and assertions
manager.search_service = AsyncMock(spec=SearchService)
manager.initialized = True
yield manager
@pytest.fixture
async def kb_manager_search_disabled(test_config_search_disabled):
"""Fixture for KB Manager with search disabled."""
with patch('cmcp.managers.knowledge_base_manager.DocumentStore') as MockDocStore, \
patch('os.makedirs'):
manager = KnowledgeBaseManager.from_env(test_config_search_disabled)
await manager.initialize()
manager.document_store = MockDocStore.return_value
yield manager
@pytest.fixture
def sample_components():
"""Sample PathComponents."""
return PathComponents.parse_path("ns/coll/doc1")
@pytest.fixture
def sample_index_obj(sample_components):
"""Sample DocumentIndex object."""
return DocumentIndex(
namespace=sample_components.namespace,
collection=sample_components.collection,
name=sample_components.name,
references=[ImplicitRDFTriple(predicate="references", object="kb://other/coll/doc2")]
)
# --- Test Basic CRUD ---
@pytest.mark.asyncio
async def test_create_document(kb_manager, sample_components):
"""Test creating a document."""
# Prepare test data
meta = {"test": 1}
mock_doc_store = kb_manager.document_store
# Mock check_index to return False so document doesn't already exist
mock_doc_store.check_index.return_value = False
# Prepare a mock result for document index
expected_index = DocumentIndex(
namespace=sample_components.namespace,
collection=sample_components.collection,
name=sample_components.name,
metadata=meta
)
# Set up the return value for the awaited call to write_index
mock_doc_store.write_index.return_value = expected_index
# Call the method under test
result = await kb_manager.create_document(sample_components, meta)
# Verify the document_store.write_index was called correctly
# Document store methods are synchronous, so use assert_called_once
mock_doc_store.write_index.assert_called_once()
# Verify call arguments
args, kwargs = mock_doc_store.write_index.call_args
# First argument should be the components
assert args[0] == sample_components
# Second argument should be an index with our metadata
assert isinstance(args[1], DocumentIndex)
assert args[1].metadata == meta
assert args[1].name == sample_components.name
# Verify the returned result
assert isinstance(result, DocumentIndex)
assert result.namespace == sample_components.namespace
assert result.collection == sample_components.collection
assert result.name == sample_components.name
assert result.metadata == meta
@pytest.mark.asyncio
async def test_write_content(kb_manager, sample_components):
"""Test writing content and updating sparse index."""
content = "Test content"
mock_doc_store = kb_manager.document_store
mock_search_service = kb_manager.search_service
# Create mock document index for result
updated_idx = DocumentIndex(name=sample_components.name)
# Set up mock return values
mock_doc_store.write_content.return_value = "content.0000.txt"
mock_doc_store.update_index.return_value = updated_idx
# Call the method being tested
result = await kb_manager.write_content(sample_components, content)
# Verify the correct interactions
mock_doc_store.write_content.assert_called_once_with(sample_components, content)
mock_doc_store.update_index.assert_called_once()
# Verify that the SearchService was called
mock_search_service.update_document_in_indices.assert_awaited_once_with(
sample_components.urn, content
)
# Check the returned result
assert result == updated_idx
@pytest.mark.asyncio
async def test_write_content_search_disabled(kb_manager_search_disabled, sample_components):
"""Test writing content doesn't update index when search is disabled."""
content = "Test content"
mock_doc_store = kb_manager_search_disabled.document_store
# Create a mock document index for result
updated_idx = DocumentIndex(name=sample_components.name)
# Set up mock return values
mock_doc_store.write_content.return_value = "content.0000.txt"
mock_doc_store.update_index.return_value = updated_idx
# Call the method being tested
result = await kb_manager_search_disabled.write_content(sample_components, content)
# Verify the correct interactions
mock_doc_store.write_content.assert_called_once_with(sample_components, content)
mock_doc_store.update_index.assert_called_once()
# Ensure search_service is None and was not called
assert kb_manager_search_disabled.search_service is None
# Check returned result
assert result == updated_idx
@pytest.mark.asyncio
async def test_read_content(kb_manager, sample_components):
"""Test reading content."""
expected_content = "File content here"
mock_doc_store = kb_manager.document_store
# Configure mocks
mock_doc_store.read_content.return_value = expected_content
# Call the method being tested
content = await kb_manager.read_content(sample_components)
# Verify calls
mock_doc_store.read_content.assert_called_once_with(sample_components)
# Check result
assert content == expected_content
@pytest.mark.asyncio
async def test_read_content_not_found(kb_manager, sample_components):
"""Test reading non-existent content (index exists)."""
mock_doc_store = kb_manager.document_store
# Configure mocks
mock_doc_store.read_content.return_value = None
# Call the method
content = await kb_manager.read_content(sample_components)
# Verify calls and result
mock_doc_store.read_content.assert_called_once_with(sample_components)
assert content is None
@pytest.mark.asyncio
async def test_read_index(kb_manager, sample_components, sample_index_obj):
"""Test reading index."""
mock_doc_store = kb_manager.document_store
# Configure mocks
mock_doc_store.read_index.return_value = sample_index_obj
# Call the method being tested
index = await kb_manager.read_index(sample_components)
# Verify calls and result
mock_doc_store.read_index.assert_called_once_with(sample_components)
assert index == sample_index_obj
@pytest.mark.asyncio
async def test_delete_document(kb_manager, sample_components):
"""Test deleting a document and updating indices."""
mock_doc_store = kb_manager.document_store
mock_search_service = kb_manager.search_service
# Configure mocks
mock_doc_store.check_index.return_value = True # Document exists
# Call the method being tested
result = await kb_manager.delete_document(sample_components)
# Verify calls
mock_doc_store.check_index.assert_called_once_with(sample_components)
# Check that search service was called to delete from indices
mock_search_service.delete_document_from_indices.assert_awaited_once_with(
sample_components.urn
)
# Check that document store was called to delete the document
mock_doc_store.delete_document.assert_called_once_with(sample_components)
# Verify result
assert result["status"] == "deleted"
@pytest.mark.asyncio
async def test_delete_document_not_found(kb_manager, sample_components):
"""Test deleting a document that doesn't exist."""
mock_doc_store = kb_manager.document_store
mock_search_service = kb_manager.search_service
# Configure mocks
mock_doc_store.check_index.return_value = False # Document doesn't exist
# Call the method being tested
result = await kb_manager.delete_document(sample_components)
# Verify calls
mock_doc_store.check_index.assert_called_once_with(sample_components)
mock_doc_store.delete_document.assert_not_called()
mock_search_service.delete_document_from_indices.assert_not_called()
# Verify result
assert result["status"] == "not_found"
# --- Test References ---
@pytest.mark.asyncio
async def test_add_reference(kb_manager, sample_components, sample_index_obj):
"""Test adding references between documents."""
mock_doc_store = kb_manager.document_store
mock_search_service = kb_manager.search_service
# Create a reference target
ref_components = PathComponents.parse_path("other/coll/doc2")
ref_index = DocumentIndex(
namespace=ref_components.namespace,
collection=ref_components.collection,
name=ref_components.name,
metadata={},
preferences=[],
references=[],
referenced_by=[]
)
# Mock existing references in sample_index_obj
sample_index_obj.references = [ImplicitRDFTriple(predicate="references", object="kb://other/coll/doc2")]
sample_index_obj.referenced_by = []
# Mock existing data for reference target
ref_index.references = []
ref_index.referenced_by = []
# Set up read_index to return the appropriate index based on components
def read_index_side_effect(components):
if components == sample_components:
return sample_index_obj
elif components == ref_components:
return ref_index
else:
raise FileNotFoundError("Document not found")
mock_doc_store.read_index.side_effect = read_index_side_effect
mock_doc_store.update_index.return_value = sample_index_obj
# Call the method
result = await kb_manager.add_reference(sample_components, ref_components, "cites")
# Verify calls - now expects 2 calls because of bidirectional updates
assert mock_doc_store.read_index.call_count == 2 # Reads both source and target
assert mock_doc_store.update_index.call_count == 2 # Updates both source and target
# Verify search service call
mock_search_service.add_triple_to_indices.assert_awaited_once_with(
sample_components.urn, "cites", ref_components.urn, "reference"
)
# Verify the result
assert result["status"] == "success"
assert result["added"] is True
@pytest.mark.asyncio
async def test_remove_reference(kb_manager, sample_components, sample_index_obj):
"""Test removing references between documents."""
mock_doc_store = kb_manager.document_store
mock_search_service = kb_manager.search_service
# Create a reference target
ref_components = PathComponents.parse_path("other/coll/doc2")
ref_index = DocumentIndex(
namespace=ref_components.namespace,
collection=ref_components.collection,
name=ref_components.name,
metadata={},
preferences=[],
references=[],
referenced_by=[]
)
# Mock existing reference to remove
ref_to_remove = ImplicitRDFTriple(predicate="cites", object="kb://other/coll/doc2")
sample_index_obj.references = [ref_to_remove]
sample_index_obj.referenced_by = []
# Mock reverse reference in target document
reverse_ref = ImplicitRDFTriple(predicate="cites", object="kb://ns/coll/doc1")
ref_index.references = []
ref_index.referenced_by = [reverse_ref]
# Set up read_index to return the appropriate index based on components
def read_index_side_effect(components):
if components == sample_components:
return sample_index_obj
elif components == ref_components:
return ref_index
else:
raise FileNotFoundError("Document not found")
mock_doc_store.read_index.side_effect = read_index_side_effect
# Mock the updated index returned after updates
updated_index = DocumentIndex(
namespace=sample_components.namespace,
collection=sample_components.collection,
name=sample_components.name,
metadata={},
preferences=[],
references=[],
referenced_by=[]
)
mock_doc_store.update_index.return_value = updated_index
# Call the method
result = await kb_manager.remove_reference(sample_components, ref_components, "cites")
# Verify calls - now expects reads of both source and target
assert mock_doc_store.read_index.call_count == 2 # Reads both source and target
assert mock_doc_store.update_index.call_count == 2 # Updates both source and target for bidirectional refs
# Verify search service call
mock_search_service.delete_triple_from_indices.assert_awaited_once_with(
sample_components.urn, "cites", ref_components.urn, "reference"
)
# Verify the result
assert result["status"] == "updated"
assert result["reference_count"] == 0 # Should return count from updated index
# --- Test Search ---
@pytest.mark.asyncio
async def test_search_sparse_only(kb_manager, sample_components):
"""Test search using sparse search only."""
mock_search_service = kb_manager.search_service
query = "test query"
# Mock the search service results
search_results = [{"urn": "kb://ns/coll/doc1", "sparse_score": 0.9}]
mock_search_service.search.return_value = search_results
# Call search with only query (sparse search only)
results = await kb_manager.search(
query=query,
top_k_rerank=5,
include_index=True
)
# Verify search service was called
mock_search_service.search.assert_awaited_once_with(
query=query,
top_k_rerank=5,
include_index=True
)
# Verify results
assert results == search_results
@pytest.mark.asyncio
async def test_search_graph_expansion_only(kb_manager):
"""Test search using graph expansion only."""
mock_search_service = kb_manager.search_service
# Mock graph expansion results
initial_urns = ["kb://ns/coll/doc1"]
search_results = [{"urn": "kb://ns/coll/doc2"}, {"urn": "kb://ns/coll/doc3"}]
mock_search_service.search.return_value = search_results
# Call search with a minimal query to satisfy the validation bug,
# but focus on graph expansion with seed URNs
results = await kb_manager.search(
query="*", # Minimal query to work around validation bug
graph_seed_urns=initial_urns,
graph_expand_hops=1,
)
# Verify graph expansion was called
mock_search_service.search.assert_awaited_once_with(
query="*",
graph_seed_urns=initial_urns,
graph_expand_hops=1,
)
# Verify results include the expanded neighbors
assert results == search_results
@pytest.mark.asyncio
async def test_search_disabled(kb_manager_search_disabled):
"""Test that search raises error when disabled."""
kb_manager_search_disabled.search_enabled = False
# Verify error is raised
with pytest.raises(RuntimeError, match="Search is disabled or not initialized."):
await kb_manager_search_disabled.search(query="test")
@pytest.mark.asyncio
async def test_search_no_query_or_filter(kb_manager):
"""Test search requires query or filter."""
# The validation is now inside the SearchService, so we just test the call.
# We expect the search service to raise the error, which we can mock.
mock_search_service = kb_manager.search_service
mock_search_service.search.side_effect = ValueError("Search requires either a query or filter_urns.")
with pytest.raises(ValueError, match="Search requires either a query or filter_urns"):
await kb_manager.search()
@pytest.mark.asyncio
async def test_search_contract():
"""Test search contract compliance."""
# Create a mock manager
mock_manager = MagicMock()
async def mock_search(query=None, graph_seed_urns=None, graph_expand_hops=0,
relation_predicates=None, top_k_sparse=50, top_k_rerank=10,
filter_urns=None, include_content=False, include_index=False,
use_reranker=True, fuzzy_distance=0):
# Verify at least one search criteria is provided (this is what the validation SHOULD be)
if not query and not graph_seed_urns:
raise ValueError("Search requires either a query or graph_seed_urns")
return []
mock_manager.search = mock_search
# Test contract with query
result = await mock_manager.search(query="test")
assert isinstance(result, list)
# Test contract with filter URNs (which is for filtering out, but still needs a query or seed URNs)
result = await mock_manager.search(query="test", filter_urns=["kb://ns/coll/exclude"])
assert isinstance(result, list)
# Test contract with graph seed URNs for graph expansion
result = await mock_manager.search(graph_seed_urns=["kb://ns/coll/seed"])
assert isinstance(result, list)
# Test error when neither query nor graph_seed_urns provided
with pytest.raises(ValueError, match="Search requires either a query or graph_seed_urns"):
await mock_manager.search()
# --- Test Missing Methods ---
@pytest.mark.asyncio
async def test_update_metadata(kb_manager, sample_components, sample_index_obj):
"""Test updating document metadata."""
mock_doc_store = kb_manager.document_store
# Setup mocks
mock_doc_store.read_index.return_value = sample_index_obj
mock_doc_store.update_index.return_value = sample_index_obj
# Test data
metadata_update = {"author": "test_author", "version": "1.0"}
# Call the method
result = await kb_manager.update_metadata(sample_components, metadata_update)
# Verify calls
mock_doc_store.read_index.assert_called_once_with(sample_components)
mock_doc_store.update_index.assert_called_once()
# Verify the update call includes the merged metadata
update_call_args = mock_doc_store.update_index.call_args[0][1]
assert "metadata" in update_call_args
assert result == sample_index_obj
@pytest.mark.asyncio
async def test_update_metadata_not_found(kb_manager, sample_components):
"""Test updating metadata for non-existent document."""
mock_doc_store = kb_manager.document_store
# Setup mocks
mock_doc_store.read_index.side_effect = FileNotFoundError("Document not found")
# Test data
metadata_update = {"author": "test_author"}
# Verify error is raised
with pytest.raises(FileNotFoundError, match="Document not found"):
await kb_manager.update_metadata(sample_components, metadata_update)
@pytest.mark.asyncio
async def test_check_index(kb_manager, sample_components):
"""Test checking if document index exists."""
mock_doc_store = kb_manager.document_store
# Setup mocks
mock_doc_store.check_index.return_value = True
# Call the method
result = await kb_manager.check_index(sample_components)
# Verify calls
mock_doc_store.check_index.assert_called_once_with(sample_components)
assert result is True
@pytest.mark.asyncio
async def test_check_content(kb_manager, sample_components):
"""Test checking if document content exists."""
mock_doc_store = kb_manager.document_store
# Setup mocks
mock_doc_store.check_content.return_value = True
# Call the method
result = await kb_manager.check_content(sample_components)
# Verify calls
mock_doc_store.check_content.assert_called_once_with(sample_components)
assert result is True
@pytest.mark.asyncio
async def test_list_documents(kb_manager):
"""Test listing documents."""
mock_doc_store = kb_manager.document_store
# Setup mocks
mock_doc_store.find_documents_recursive.return_value = ["ns/coll/doc1", "ns/coll/doc2"]
mock_doc_store.find_documents_shallow.return_value = ["ns/coll/doc1"]
# Test recursive listing (default)
result = await kb_manager.list_documents()
mock_doc_store.find_documents_recursive.assert_called_once()
assert result == ["ns/coll/doc1", "ns/coll/doc2"]
# Reset mocks
mock_doc_store.reset_mock()
# Test shallow listing
result = await kb_manager.list_documents(recursive=False)
mock_doc_store.find_documents_shallow.assert_called_once()
assert result == ["ns/coll/doc1"]
@pytest.mark.asyncio
async def test_move_document(kb_manager, sample_components, sample_index_obj):
"""Test moving a document to a new location."""
mock_doc_store = kb_manager.document_store
mock_search_service = kb_manager.search_service
# Create new target components
new_components = PathComponents.parse_path("newns/newcoll/newname")
# Setup the sample index object to have the correct URN
# We can't set urn directly since it's a property, so we'll use the sample_components as-is
mock_doc_store.read_index.return_value = sample_index_obj
mock_doc_store.move_document.return_value = new_components
# Mock empty references to avoid reference update logic
sample_index_obj.references = []
sample_index_obj.referenced_by = []
# Mock content reading for search index updates
mock_doc_store.read_content.return_value = "Test content"
# Call the method
result = await kb_manager.move_document(sample_components, new_components)
# Verify calls - move operation reads the index multiple times
assert mock_doc_store.read_index.call_count >= 1 # Called at least once
mock_doc_store.move_document.assert_called_once_with(sample_components, new_components)
# Verify result
assert result == sample_index_obj # Returns the index object
# Verify search index update was attempted (if search is enabled)
if kb_manager.search_enabled:
mock_search_service.move_document_in_indices.assert_awaited_once_with(
sample_components.urn, new_components.urn, "Test content"
)
@pytest.mark.asyncio
async def test_move_document_not_found(kb_manager, sample_components):
"""Test moving a non-existent document."""
mock_doc_store = kb_manager.document_store
# Setup mocks
mock_doc_store.read_index.side_effect = FileNotFoundError("Document not found")
new_components = PathComponents.parse_path("newns/newcoll/newname")
# Verify error is raised
with pytest.raises(FileNotFoundError, match="Document not found"):
await kb_manager.move_document(sample_components, new_components)
@pytest.mark.asyncio
async def test_archive_document(kb_manager, sample_components, sample_index_obj):
"""Test archiving a document."""
mock_doc_store = kb_manager.document_store
mock_search_service = kb_manager.search_service
# Setup mocks
mock_doc_store.check_index.return_value = True
mock_doc_store.read_index.return_value = sample_index_obj
# Mock empty references to avoid reference cleanup logic
sample_index_obj.references = []
sample_index_obj.referenced_by = []
# Call the method
result = await kb_manager.archive_document(sample_components)
# Verify search service was called to delete the doc
if kb_manager.search_enabled:
mock_search_service.delete_document_from_indices.assert_awaited_once_with(
sample_components.urn
)
# Verify result
assert result["status"] == "archived"
assert "archive_path" in result
@pytest.mark.asyncio
async def test_archive_document_not_found(kb_manager, sample_components):
"""Test archiving a non-existent document."""
mock_doc_store = kb_manager.document_store
# Setup mocks
mock_doc_store.check_index.return_value = False
# Call the method
result = await kb_manager.archive_document(sample_components)
# Verify result
assert result["status"] == "not_found"
assert "Document not found" in result["message"]
@pytest.mark.asyncio
async def test_add_preference(kb_manager, sample_components, sample_index_obj):
"""Test adding preferences to a document."""
mock_doc_store = kb_manager.document_store
mock_search_service = kb_manager.search_service
# Setup mocks
mock_doc_store.read_index.return_value = sample_index_obj
mock_doc_store.update_index.return_value = sample_index_obj
# Mock initial empty preferences
sample_index_obj.preferences = []
# Test data
preferences = [ImplicitRDFTriple(predicate="hasTag", object="important")]
# Call the method
result = await kb_manager.add_preference(sample_components, preferences)
# Verify calls
mock_doc_store.read_index.assert_called_once_with(sample_components)
mock_doc_store.update_index.assert_called_once()
# Verify search call
mock_search_service.add_triple_to_indices.assert_awaited_once_with(
sample_components.urn, "hasTag", "important", "preference"
)
assert result["status"] == "updated"
@pytest.mark.asyncio
async def test_remove_preference(kb_manager, sample_components, sample_index_obj):
"""Test removing preferences from a document."""
mock_doc_store = kb_manager.document_store
mock_search_service = kb_manager.search_service
# Create preferences to remove
pref_to_remove = ImplicitRDFTriple(predicate="hasTag", object="remove_me")
pref_to_keep = ImplicitRDFTriple(predicate="hasTag", object="keep_me")
# Mock existing preferences
sample_index_obj.preferences = [pref_to_remove, pref_to_keep]
mock_doc_store.read_index.return_value = sample_index_obj
mock_doc_store.update_index.return_value = sample_index_obj
# Call the method
result = await kb_manager.remove_preference(sample_components, [pref_to_remove])
# Verify calls
mock_doc_store.read_index.assert_called_once_with(sample_components)
mock_doc_store.update_index.assert_called_once()
# Verify search call
mock_search_service.delete_triple_from_indices.assert_awaited_once_with(
sample_components.urn, "hasTag", "remove_me", "preference"
)
assert result["status"] == "updated"
@pytest.mark.asyncio
async def test_remove_all_preferences(kb_manager, sample_components, sample_index_obj):
"""Test removing all preferences from a document."""
mock_doc_store = kb_manager.document_store
mock_search_service = kb_manager.search_service
# Setup mocks
sample_index_obj.preferences = [ImplicitRDFTriple("p1", "o1"), ImplicitRDFTriple("p2", "o2")]
mock_doc_store.read_index.return_value = sample_index_obj
mock_doc_store.update_index.return_value = sample_index_obj
# Call the method
result = await kb_manager.remove_all_preferences(sample_components)
# Verify calls
mock_doc_store.read_index.assert_called_once_with(sample_components)
mock_doc_store.update_index.assert_called_once_with(sample_components, {"preferences": []})
# Verify search calls
assert mock_search_service.delete_triple_from_indices.await_count == 2
mock_search_service.delete_triple_from_indices.assert_any_await("kb://ns/coll/doc1", "p1", "o1", "preference")
mock_search_service.delete_triple_from_indices.assert_any_await("kb://ns/coll/doc1", "p2", "o2", "preference")
assert result["status"] == "updated"
@pytest.mark.asyncio
async def test_recover_search_indices(kb_manager):
"""Test recovering search indices."""
mock_search_service = kb_manager.search_service
# Mock the recovery methods
mock_search_service.recover_indices.return_value = {
"sparse_index": {"status": "recovered"},
"graph_index": {"status": "recovered"},
}
# Call the method
result = await kb_manager.recover_search_indices()
# Verify that the search service was called
mock_search_service.recover_indices.assert_awaited_once_with(rebuild_all=False)
# Verify result structure
assert "sparse_index" in result
assert "graph_index" in result
assert result["sparse_index"]["status"] == "recovered"
assert result["graph_index"]["status"] == "recovered"
@pytest.mark.asyncio
async def test_recover_search_indices_disabled(kb_manager_search_disabled):
"""Test recovering search indices when search is disabled."""
# Verify error is raised
with pytest.raises(RuntimeError, match="Search is disabled or not initialized."):
await kb_manager_search_disabled.recover_search_indices()
@pytest.mark.asyncio
async def test_from_env():
"""Test creating KnowledgeBaseManager from environment configuration."""
with patch('cmcp.config.load_config') as mock_load_config:
# Mock config
mock_config = MagicMock()
mock_config.kb_config.storage_path = "/test/path"
mock_config.kb_config.timeout_default = 30
mock_config.kb_config.timeout_max = 300
mock_config.kb_config.search_enabled = True
mock_config.kb_config.sparse_index_path = "/test/sparse"
mock_config.kb_config.graph_index_path = "/test/graph"
mock_config.kb_config.reranker_model = "test-model"
mock_config.kb_config.search_relation_predicates = ["references"]
mock_config.kb_config.search_graph_neighbor_limit = 1000
mock_load_config.return_value = mock_config
# Call the method
manager = KnowledgeBaseManager.from_env()
# Verify configuration was used
assert manager.storage_path == "/test/path"
assert manager.timeout_default == 30
assert manager.timeout_max == 300
assert manager.search_enabled is True
assert manager.search_graph_neighbor_limit == 1000
@pytest.mark.asyncio
async def test_initialize():
"""Test initializing the knowledge base manager."""
with patch('os.makedirs'), \
patch('cmcp.managers.knowledge_base_manager.DocumentStore'), \
patch('cmcp.managers.knowledge_base_manager.SearchService'), \
patch('os.path.exists', return_value=True):
# Create manager with search enabled
manager = KnowledgeBaseManager(
storage_path="/test/path",
timeout_default=30,
timeout_max=300,
search_enabled=True,
sparse_index_path="/test/sparse",
graph_index_path="/test/graph",
reranker_model="test-model"
)
await manager.initialize()
# No need for asserts here, if it initializes without error, the test passes.
# The internal workings are mocked.
pass
# --- Existing Contract Tests (these should remain as they test the API contract) ---
@pytest.mark.asyncio
async def test_create_document_contract():
"""Test create_document contract compliance."""
# Create a mock manager that accepts the expected arguments
mock_manager = MagicMock()
async def mock_create_document(components, metadata=None):
# Verify the arguments match what we expect
assert hasattr(components, 'namespace')
assert hasattr(components, 'collection')
assert hasattr(components, 'name')
assert metadata is None or isinstance(metadata, dict)
# Return a mock DocumentIndex
return MagicMock()
mock_manager.create_document = mock_create_document
# Test with minimal arguments
components = PathComponents.parse_path("ns/coll/name")
result = await mock_manager.create_document(components)
assert result is not None
# Test with metadata
result = await mock_manager.create_document(components, {"key": "value"})
assert result is not None
@pytest.mark.asyncio
async def test_read_content_contract():
"""Test read_content contract compliance."""
# Create a mock manager
mock_manager = MagicMock()
async def mock_read_content(components):
# Verify the arguments match what we expect
assert hasattr(components, 'namespace')
assert hasattr(components, 'collection')
assert hasattr(components, 'name')
return "test content"
mock_manager.read_content = mock_read_content
# Test contract
components = PathComponents.parse_path("ns/coll/name")
result = await mock_manager.read_content(components)
assert result == "test content"
@pytest.mark.asyncio
async def test_read_index_contract():
"""Test read_index contract compliance."""
# Create a mock manager
mock_manager = MagicMock()
async def mock_read_index(components):
# Verify the arguments match what we expect
assert hasattr(components, 'namespace')
assert hasattr(components, 'collection')
assert hasattr(components, 'name')
# Return a mock DocumentIndex
return MagicMock()
mock_manager.read_index = mock_read_index
# Test contract
components = PathComponents.parse_path("ns/coll/name")
result = await mock_manager.read_index(components)
assert result is not None
@pytest.mark.asyncio
async def test_delete_document_contract():
"""Test delete_document contract compliance."""
# Create a mock manager
mock_manager = MagicMock()
async def mock_delete_document(components):
# Verify the arguments match what we expect
assert hasattr(components, 'namespace')
assert hasattr(components, 'collection')
assert hasattr(components, 'name')
return {"status": "deleted"}
mock_manager.delete_document = mock_delete_document
# Test contract
components = PathComponents.parse_path("ns/coll/name")
result = await mock_manager.delete_document(components)
assert result["status"] == "deleted"
@pytest.mark.asyncio
async def test_add_reference_contract():
"""Test add_reference contract compliance."""
# Create a mock manager
mock_manager = MagicMock()
async def mock_add_reference(components, ref_components, relation):
# Verify arguments match what we expect
assert hasattr(components, 'namespace')
assert hasattr(components, 'collection')
assert hasattr(components, 'name')
assert hasattr(ref_components, 'namespace')
assert hasattr(ref_components, 'collection')
assert hasattr(ref_components, 'name')
assert isinstance(relation, str)
return {"status": "success", "added": True}
mock_manager.add_reference = mock_add_reference
# Test contract
components = PathComponents.parse_path("ns/coll/name")
ref_components = PathComponents.parse_path("ns/coll/ref")
result = await mock_manager.add_reference(components, ref_components, "references")
assert result["status"] == "success"
@pytest.mark.asyncio
async def test_remove_reference_contract():
"""Test that remove_reference contract is maintained."""
from cmcp.managers.knowledge_base_manager import KnowledgeBaseManager
import inspect
# Get the signature of the method
sig = inspect.signature(KnowledgeBaseManager.remove_reference)
params = list(sig.parameters.keys())
# Contract for remove_reference: self, components, ref_components, relation
expected_params = ['self', 'components', 'ref_components', 'relation']
assert params == expected_params, f"Expected {expected_params}, got {params}"
# Mock implementation to test the calls
async def mock_remove_reference(components, ref_components, relation):
# Verify arguments match what we expect
assert isinstance(components, PathComponents)
assert isinstance(ref_components, PathComponents)
assert isinstance(relation, str)
return {"status": "updated", "reference_count": 0}
# Run our test with the mock
manager = MagicMock()
manager.remove_reference = mock_remove_reference
components = PathComponents.parse_path("ns/coll/doc1")
ref_components = PathComponents.parse_path("ns/coll/doc2")
relation = "references"
result = await manager.remove_reference(components, ref_components, relation)
assert result["status"] == "updated"
# --- Test New Metadata Property Functions ---
@pytest.mark.asyncio
async def test_add_metadata_property(kb_manager, sample_components, sample_index_obj):
"""Test adding a single metadata property to a document."""
mock_doc_store = kb_manager.document_store
# Setup mocks
mock_doc_store.read_index.return_value = sample_index_obj
# Prepare updated index with new metadata
updated_index = sample_index_obj.model_copy(deep=True)
updated_index.metadata = {"existing": "value", "author": "test_author"}
mock_doc_store.update_index.return_value = updated_index
# Test data
key = "author"
value = "test_author"
# Call the method
result = await kb_manager.add_metadata_property(sample_components, key, value)
# Verify calls
mock_doc_store.read_index.assert_called_once_with(sample_components)
mock_doc_store.update_index.assert_called_once()
# Verify the update call includes the new metadata property
update_call_args = mock_doc_store.update_index.call_args[0][1]
assert "metadata" in update_call_args
assert update_call_args["metadata"][key] == value
# Verify result
assert result["status"] == "updated"
assert result["key"] == key
assert result["value"] == value
assert result["metadata_count"] == len(updated_index.metadata)
@pytest.mark.asyncio
async def test_add_metadata_property_not_found(kb_manager, sample_components):
"""Test adding metadata property to non-existent document."""
mock_doc_store = kb_manager.document_store
# Setup mocks
mock_doc_store.read_index.side_effect = FileNotFoundError("Document not found")
# Test data
key = "author"
value = "test_author"
# Verify error is raised
with pytest.raises(FileNotFoundError, match="Document not found"):
await kb_manager.add_metadata_property(sample_components, key, value)
@pytest.mark.asyncio
async def test_remove_metadata_property(kb_manager, sample_components, sample_index_obj):
"""Test removing a metadata property from a document."""
mock_doc_store = kb_manager.document_store
# Set up sample index with existing metadata
sample_index_obj.metadata = {"author": "test_author", "version": "1.0"}
mock_doc_store.read_index.return_value = sample_index_obj
# Prepare updated index with removed metadata
updated_index = sample_index_obj.model_copy(deep=True)
updated_index.metadata = {"version": "1.0"} # author removed
mock_doc_store.update_index.return_value = updated_index
# Test data
key = "author"
# Call the method
result = await kb_manager.remove_metadata_property(sample_components, key)
# Verify calls
mock_doc_store.read_index.assert_called_once_with(sample_components)
mock_doc_store.update_index.assert_called_once()
# Verify the update call excludes the removed metadata property
update_call_args = mock_doc_store.update_index.call_args[0][1]
assert "metadata" in update_call_args
assert key not in update_call_args["metadata"]
# Verify result
assert result["status"] == "updated"
assert result["key"] == key
assert result["removed"] is True
assert result["metadata_count"] == len(updated_index.metadata)
@pytest.mark.asyncio
async def test_remove_metadata_property_not_exists(kb_manager, sample_components, sample_index_obj):
"""Test removing a non-existent metadata property."""
mock_doc_store = kb_manager.document_store
# Set up sample index with existing metadata (no "nonexistent" key)
sample_index_obj.metadata = {"author": "test_author", "version": "1.0"}
mock_doc_store.read_index.return_value = sample_index_obj
# Index should remain unchanged
updated_index = sample_index_obj.model_copy(deep=True)
mock_doc_store.update_index.return_value = updated_index
# Test data
key = "nonexistent"
# Call the method
result = await kb_manager.remove_metadata_property(sample_components, key)
# Verify calls
mock_doc_store.read_index.assert_called_once_with(sample_components)
mock_doc_store.update_index.assert_called_once()
# Verify result
assert result["status"] == "unchanged"
assert result["key"] == key
assert result["removed"] is False
assert result["metadata_count"] == len(sample_index_obj.metadata)
@pytest.mark.asyncio
async def test_remove_metadata_property_not_found(kb_manager, sample_components):
"""Test removing metadata property from non-existent document."""
mock_doc_store = kb_manager.document_store
# Setup mocks
mock_doc_store.read_index.side_effect = FileNotFoundError("Document not found")
# Test data
key = "author"
# Verify error is raised
with pytest.raises(FileNotFoundError, match="Document not found"):
await kb_manager.remove_metadata_property(sample_components, key)
@pytest.mark.asyncio
async def test_add_metadata_property_contract():
"""Test that add_metadata_property contract is maintained."""
from cmcp.managers.knowledge_base_manager import KnowledgeBaseManager
import inspect
# Get the signature of the method
sig = inspect.signature(KnowledgeBaseManager.add_metadata_property)
params = list(sig.parameters.keys())
# Contract for add_metadata_property: self, components, key, value
expected_params = ['self', 'components', 'key', 'value']
assert params == expected_params, f"Expected {expected_params}, got {params}"
# Mock implementation to test the calls
async def mock_add_metadata_property(components, key, value):
# Verify arguments match what we expect
assert isinstance(components, PathComponents)
assert isinstance(key, str)
# value can be any type
return {"status": "updated", "metadata_count": 1, "key": key, "value": value}
# Run our test with the mock
manager = MagicMock()
manager.add_metadata_property = mock_add_metadata_property
components = PathComponents.parse_path("ns/coll/doc1")
key = "author"
value = "test_author"
result = await manager.add_metadata_property(components, key, value)
assert result["status"] == "updated"
assert result["key"] == key
assert result["value"] == value
@pytest.mark.asyncio
async def test_remove_metadata_property_contract():
"""Test that remove_metadata_property contract is maintained."""
from cmcp.managers.knowledge_base_manager import KnowledgeBaseManager
import inspect
# Get the signature of the method
sig = inspect.signature(KnowledgeBaseManager.remove_metadata_property)
params = list(sig.parameters.keys())
# Contract for remove_metadata_property: self, components, key
expected_params = ['self', 'components', 'key']
assert params == expected_params, f"Expected {expected_params}, got {params}"
# Mock implementation to test the calls
async def mock_remove_metadata_property(components, key):
# Verify arguments match what we expect
assert isinstance(components, PathComponents)
assert isinstance(key, str)
return {"status": "updated", "metadata_count": 0, "key": key, "removed": True}
# Run our test with the mock
manager = MagicMock()
manager.remove_metadata_property = mock_remove_metadata_property
components = PathComponents.parse_path("ns/coll/doc1")
key = "author"
result = await manager.remove_metadata_property(components, key)
assert result["status"] == "updated"
assert result["key"] == key
assert result["removed"] is True