Skip to main content
Glama
kzmshx
by kzmshx
test_server.py25.8 kB
"""Tests for MCP server module.""" import datetime import tempfile from pathlib import Path from unittest.mock import MagicMock import frontmatter import numpy as np import pytest import frontmatter_mcp.dependencies as deps import frontmatter_mcp.server as server_module from frontmatter_mcp.settings import get_settings def _reset_caches() -> None: """Reset all singleton caches for testing.""" deps.reset_caches() get_settings.cache_clear() def _settings_dep() -> dict: """Get settings dependency for tool functions that only need settings.""" return {"settings": deps.get_settings()} def _query_deps() -> dict: """Get dependencies for query/query_inspect tools.""" return { "settings": deps.get_settings(), "cache": deps.get_file_record_cache(), "semantic_ctx": deps.get_semantic_ctx(), } def _semantic_dep() -> dict: """Get semantic_ctx dependency for index_* tools.""" return {"semantic_ctx": deps.get_semantic_ctx()} @pytest.fixture def temp_base_dir(monkeypatch: pytest.MonkeyPatch): """Create a temporary directory with test markdown files.""" with tempfile.TemporaryDirectory() as tmpdir: base = Path(tmpdir) # Create test files (base / "a.md").write_text( """--- date: 2025-11-27 tags: [python, mcp] --- # File A """ ) (base / "b.md").write_text( """--- date: 2025-11-26 tags: [duckdb] --- # File B """ ) (base / "subdir").mkdir() (base / "subdir" / "c.md").write_text( """--- date: 2025-11-25 tags: [python] summary: A summary --- # File C """ ) # Set base_dir via environment variable and reset caches monkeypatch.setenv("FRONTMATTER_BASE_DIR", str(base)) _reset_caches() yield base _reset_caches() class TestQueryInspect: """Tests for query_inspect tool.""" def test_basic_schema(self, temp_base_dir: Path) -> None: """Get schema from files.""" result = server_module.query_inspect.fn(glob="*.md", **_query_deps()) assert result["file_count"] == 2 assert "date" in result["schema"] assert "tags" in result["schema"] def test_recursive_glob(self, temp_base_dir: Path) -> None: """Get schema with recursive glob.""" result = server_module.query_inspect.fn(glob="**/*.md", **_query_deps()) assert result["file_count"] == 3 assert "summary" in result["schema"] class TestQuery: """Tests for query tool.""" def test_select_all(self, temp_base_dir: Path) -> None: """Select all files.""" result = server_module.query.fn( glob="**/*.md", sql="SELECT path FROM files ORDER BY path", **_query_deps() ) assert result["row_count"] == 3 assert "path" in result["columns"] def test_where_clause(self, temp_base_dir: Path) -> None: """Filter by date.""" result = server_module.query.fn( glob="**/*.md", sql="SELECT path FROM files WHERE date >= '2025-11-26'", **_query_deps(), ) assert result["row_count"] == 2 paths = [r["path"] for r in result["results"]] assert "a.md" in paths assert "b.md" in paths def test_tag_contains(self, temp_base_dir: Path) -> None: """Filter by tag using from_json.""" result = server_module.query.fn( glob="**/*.md", sql="""SELECT path FROM files WHERE list_contains(from_json(tags, '["VARCHAR"]'), 'python')""", **_query_deps(), ) assert result["row_count"] == 2 def test_tag_aggregation(self, temp_base_dir: Path) -> None: """Aggregate tags using from_json.""" result = server_module.query.fn( glob="**/*.md", sql=""" SELECT tag, COUNT(*) AS count FROM files, UNNEST(from_json(tags, '["VARCHAR"]')) AS t(tag) GROUP BY tag ORDER BY count DESC """, **_query_deps(), ) assert result["row_count"] == 3 assert result["results"][0]["tag"] == "python" assert result["results"][0]["count"] == 2 class TestUpdate: """Tests for update tool.""" def test_set_property(self, temp_base_dir: Path) -> None: """Set a property on a file.""" result = server_module.update.fn( path="a.md", set={"status": "published"}, **_settings_dep() ) assert result["path"] == "a.md" assert result["frontmatter"]["status"] == "published" assert result["frontmatter"]["date"] == datetime.date(2025, 11, 27) def test_unset_property(self, temp_base_dir: Path) -> None: """Unset a property from a file.""" result = server_module.update.fn(path="b.md", unset=["tags"], **_settings_dep()) assert "tags" not in result["frontmatter"] def test_set_and_unset(self, temp_base_dir: Path) -> None: """Set and unset properties.""" result = server_module.update.fn( path="subdir/c.md", set={"status": "done"}, unset=["summary"], **_settings_dep(), ) assert result["path"] == "subdir/c.md" assert result["frontmatter"]["status"] == "done" assert "summary" not in result["frontmatter"] def test_file_not_found(self, temp_base_dir: Path) -> None: """Error when file does not exist.""" with pytest.raises(FileNotFoundError): server_module.update.fn( path="nonexistent.md", set={"x": 1}, **_settings_dep() ) def test_path_outside_base_dir(self, temp_base_dir: Path) -> None: """Error when path is outside base_dir.""" with pytest.raises(ValueError): server_module.update.fn( path="../outside.md", set={"x": 1}, **_settings_dep() ) class TestBatchUpdate: """Tests for batch_update tool.""" def test_set_property_all_files(self, temp_base_dir: Path) -> None: """Set a property on all matching files.""" result = server_module.batch_update.fn( glob="*.md", set={"status": "reviewed"}, **_settings_dep() ) assert result["updated_count"] == 2 assert "a.md" in result["updated_files"] assert "b.md" in result["updated_files"] post = frontmatter.load(temp_base_dir / "a.md") assert post["status"] == "reviewed" def test_recursive_glob(self, temp_base_dir: Path) -> None: """Update all files including subdirectories.""" result = server_module.batch_update.fn( glob="**/*.md", set={"batch": True}, **_settings_dep() ) assert result["updated_count"] == 3 assert "subdir/c.md" in result["updated_files"] def test_unset_property(self, temp_base_dir: Path) -> None: """Unset a property from all matching files.""" result = server_module.batch_update.fn( glob="**/*.md", unset=["tags"], **_settings_dep() ) assert result["updated_count"] == 3 post = frontmatter.load(temp_base_dir / "a.md") assert "tags" not in post.keys() def test_set_and_unset(self, temp_base_dir: Path) -> None: """Set and unset properties in batch.""" result = server_module.batch_update.fn( glob="**/*.md", set={"new_prop": "value"}, unset=["date"], **_settings_dep(), ) assert result["updated_count"] == 3 post = frontmatter.load(temp_base_dir / "b.md") assert post["new_prop"] == "value" assert "date" not in post.keys() def test_no_matching_files(self, temp_base_dir: Path) -> None: """Handle no matching files gracefully.""" result = server_module.batch_update.fn( glob="*.txt", set={"x": 1}, **_settings_dep() ) assert result["updated_count"] == 0 assert result["updated_files"] == [] def test_no_warnings_key_when_success(self, temp_base_dir: Path) -> None: """Warnings key is absent when all updates succeed.""" result = server_module.batch_update.fn( glob="*.md", set={"status": "ok"}, **_settings_dep() ) assert result["updated_count"] == 2 assert "warnings" not in result def test_warnings_on_malformed_frontmatter(self, temp_base_dir: Path) -> None: """Warnings are populated when file has malformed frontmatter.""" # Create a file with malformed YAML frontmatter (temp_base_dir / "malformed.md").write_text( "---\ninvalid: [unclosed\n---\n# Content" ) result = server_module.batch_update.fn( glob="*.md", set={"status": "ok"}, **_settings_dep() ) # a.md and b.md should succeed, malformed.md should fail assert result["updated_count"] == 2 assert "warnings" in result assert len(result["warnings"]) == 1 assert "malformed.md" in result["warnings"][0] class TestBatchArrayAdd: """Tests for batch_array_add tool.""" def test_add_value_to_existing_array(self, temp_base_dir: Path) -> None: """Add a value to an existing array property.""" result = server_module.batch_array_add.fn( glob="*.md", property="tags", value="new-tag", **_settings_dep() ) assert result["updated_count"] == 2 assert "a.md" in result["updated_files"] post = frontmatter.load(temp_base_dir / "a.md") assert "new-tag" in post["tags"] def test_skip_duplicate_value(self, temp_base_dir: Path) -> None: """Skip files where value already exists (allow_duplicates=False).""" result = server_module.batch_array_add.fn( glob="*.md", property="tags", value="python", **_settings_dep() ) # a.md has [python, mcp], b.md has [duckdb] # a.md is skipped (python already exists), b.md is updated assert result["updated_count"] == 1 assert "b.md" in result["updated_files"] def test_allow_duplicates(self, temp_base_dir: Path) -> None: """Allow duplicate values when allow_duplicates=True.""" result = server_module.batch_array_add.fn( glob="*.md", property="tags", value="python", allow_duplicates=True, **_settings_dep(), ) assert result["updated_count"] == 2 post = frontmatter.load(temp_base_dir / "a.md") assert post["tags"].count("python") == 2 def test_create_property_if_not_exists(self, temp_base_dir: Path) -> None: """Create array property if it doesn't exist.""" result = server_module.batch_array_add.fn( glob="*.md", property="categories", value="blog", **_settings_dep() ) assert result["updated_count"] == 2 post = frontmatter.load(temp_base_dir / "a.md") assert post["categories"] == ["blog"] def test_skip_non_array_property(self, temp_base_dir: Path) -> None: """Skip and warn when property is not an array.""" result = server_module.batch_array_add.fn( glob="*.md", property="date", value="value", **_settings_dep() ) assert result["updated_count"] == 0 assert "warnings" in result assert len(result["warnings"]) == 2 def test_value_as_array_not_flattened(self, temp_base_dir: Path) -> None: """Array value should be added as single element, not flattened.""" result = server_module.batch_array_add.fn( glob="*.md", property="tags", value=["nested", "array"], **_settings_dep() ) assert result["updated_count"] == 2 post = frontmatter.load(temp_base_dir / "a.md") assert ["nested", "array"] in post["tags"] class TestBatchArrayRemove: """Tests for batch_array_remove tool.""" def test_remove_value_from_array(self, temp_base_dir: Path) -> None: """Remove a value from array property.""" result = server_module.batch_array_remove.fn( glob="**/*.md", property="tags", value="python", **_settings_dep() ) # a.md and c.md have python tag assert result["updated_count"] == 2 post = frontmatter.load(temp_base_dir / "a.md") assert "python" not in post["tags"] def test_skip_if_value_not_exists(self, temp_base_dir: Path) -> None: """Skip files where value doesn't exist.""" result = server_module.batch_array_remove.fn( glob="*.md", property="tags", value="nonexistent", **_settings_dep() ) assert result["updated_count"] == 0 assert "warnings" not in result def test_skip_if_property_not_exists(self, temp_base_dir: Path) -> None: """Skip files where property doesn't exist.""" result = server_module.batch_array_remove.fn( glob="*.md", property="categories", value="value", **_settings_dep() ) assert result["updated_count"] == 0 assert "warnings" not in result def test_skip_non_array_property(self, temp_base_dir: Path) -> None: """Skip and warn when property is not an array.""" result = server_module.batch_array_remove.fn( glob="*.md", property="date", value="value", **_settings_dep() ) assert result["updated_count"] == 0 assert "warnings" in result class TestBatchArrayReplace: """Tests for batch_array_replace tool.""" def test_replace_value_in_array(self, temp_base_dir: Path) -> None: """Replace a value in array property.""" result = server_module.batch_array_replace.fn( glob="**/*.md", property="tags", old_value="python", new_value="py", **_settings_dep(), ) assert result["updated_count"] == 2 post = frontmatter.load(temp_base_dir / "a.md") assert "py" in post["tags"] assert "python" not in post["tags"] def test_skip_if_old_value_not_exists(self, temp_base_dir: Path) -> None: """Skip files where old_value doesn't exist.""" result = server_module.batch_array_replace.fn( glob="*.md", property="tags", old_value="nonexistent", new_value="new", **_settings_dep(), ) assert result["updated_count"] == 0 assert "warnings" not in result def test_skip_if_property_not_exists(self, temp_base_dir: Path) -> None: """Skip files where property doesn't exist.""" result = server_module.batch_array_replace.fn( glob="*.md", property="categories", old_value="old", new_value="new", **_settings_dep(), ) assert result["updated_count"] == 0 assert "warnings" not in result def test_skip_non_array_property(self, temp_base_dir: Path) -> None: """Skip and warn when property is not an array.""" result = server_module.batch_array_replace.fn( glob="*.md", property="date", old_value="old", new_value="new", **_settings_dep(), ) assert result["updated_count"] == 0 assert "warnings" in result class TestBatchArraySort: """Tests for batch_array_sort tool.""" def test_sort_array_ascending(self, temp_base_dir: Path) -> None: """Sort array in ascending order.""" result = server_module.batch_array_sort.fn( glob="*.md", property="tags", **_settings_dep() ) # a.md has [python, mcp] -> [mcp, python] (updated) # b.md has [duckdb] (single element, already sorted, skipped) assert result["updated_count"] == 1 assert "a.md" in result["updated_files"] post = frontmatter.load(temp_base_dir / "a.md") assert post["tags"] == ["mcp", "python"] def test_sort_array_descending(self, temp_base_dir: Path) -> None: """Sort array in descending order.""" result = server_module.batch_array_sort.fn( glob="*.md", property="tags", reverse=True, **_settings_dep() ) # a.md has [python, mcp] - already descending order (skipped) # b.md has [duckdb] (single element, already sorted, skipped) assert result["updated_count"] == 0 def test_sort_array_descending_updated(self, temp_base_dir: Path) -> None: """Sort array in descending order when not already sorted.""" # First sort ascending server_module.batch_array_sort.fn( glob="*.md", property="tags", **_settings_dep() ) # Now a.md has [mcp, python], reverse should update it result = server_module.batch_array_sort.fn( glob="*.md", property="tags", reverse=True, **_settings_dep() ) assert result["updated_count"] == 1 post = frontmatter.load(temp_base_dir / "a.md") assert post["tags"] == ["python", "mcp"] def test_skip_if_already_sorted(self, temp_base_dir: Path) -> None: """Skip files where array is already sorted.""" # First sort server_module.batch_array_sort.fn( glob="*.md", property="tags", **_settings_dep() ) # Second sort should skip result = server_module.batch_array_sort.fn( glob="*.md", property="tags", **_settings_dep() ) assert result["updated_count"] == 0 def test_skip_empty_array(self, temp_base_dir: Path) -> None: """Skip files with empty array.""" (temp_base_dir / "empty.md").write_text("---\ntags: []\n---\n# Empty") result = server_module.batch_array_sort.fn( glob="empty.md", property="tags", **_settings_dep() ) assert result["updated_count"] == 0 def test_skip_if_property_not_exists(self, temp_base_dir: Path) -> None: """Skip files where property doesn't exist.""" result = server_module.batch_array_sort.fn( glob="*.md", property="categories", **_settings_dep() ) assert result["updated_count"] == 0 assert "warnings" not in result def test_skip_non_array_property(self, temp_base_dir: Path) -> None: """Skip and warn when property is not an array.""" result = server_module.batch_array_sort.fn( glob="*.md", property="date", **_settings_dep() ) assert result["updated_count"] == 0 assert "warnings" in result class TestBatchArrayUnique: """Tests for batch_array_unique tool.""" def test_remove_duplicates(self, temp_base_dir: Path) -> None: """Remove duplicate values from array.""" (temp_base_dir / "dup.md").write_text("---\ntags: [a, b, a, c, b]\n---\n# Dup") result = server_module.batch_array_unique.fn( glob="dup.md", property="tags", **_settings_dep() ) assert result["updated_count"] == 1 assert "dup.md" in result["updated_files"] post = frontmatter.load(temp_base_dir / "dup.md") assert post["tags"] == ["a", "b", "c"] def test_preserve_order(self, temp_base_dir: Path) -> None: """Preserve first occurrence order when removing duplicates.""" content = "---\ntags: [z, a, z, m, a]\n---\n# Order" (temp_base_dir / "order.md").write_text(content) result = server_module.batch_array_unique.fn( glob="order.md", property="tags", **_settings_dep() ) assert result["updated_count"] == 1 post = frontmatter.load(temp_base_dir / "order.md") assert post["tags"] == ["z", "a", "m"] def test_skip_if_no_duplicates(self, temp_base_dir: Path) -> None: """Skip files where array has no duplicates.""" result = server_module.batch_array_unique.fn( glob="a.md", property="tags", **_settings_dep() ) # a.md has [python, mcp] - no duplicates assert result["updated_count"] == 0 def test_skip_empty_array(self, temp_base_dir: Path) -> None: """Skip files with empty array.""" (temp_base_dir / "empty.md").write_text("---\ntags: []\n---\n# Empty") result = server_module.batch_array_unique.fn( glob="empty.md", property="tags", **_settings_dep() ) assert result["updated_count"] == 0 def test_skip_single_element(self, temp_base_dir: Path) -> None: """Skip files with single element array.""" result = server_module.batch_array_unique.fn( glob="b.md", property="tags", **_settings_dep() ) # b.md has [duckdb] - single element assert result["updated_count"] == 0 def test_skip_if_property_not_exists(self, temp_base_dir: Path) -> None: """Skip files where property doesn't exist.""" result = server_module.batch_array_unique.fn( glob="*.md", property="categories", **_settings_dep() ) assert result["updated_count"] == 0 assert "warnings" not in result def test_skip_non_array_property(self, temp_base_dir: Path) -> None: """Skip and warn when property is not an array.""" result = server_module.batch_array_unique.fn( glob="*.md", property="date", **_settings_dep() ) assert result["updated_count"] == 0 assert "warnings" in result class TestSemanticSearchTools: """Tests for semantic search tools.""" @pytest.fixture def semantic_base_dir(self, temp_base_dir: Path, monkeypatch: pytest.MonkeyPatch): """Enable semantic search for tests.""" monkeypatch.setenv("FRONTMATTER_ENABLE_SEMANTIC", "true") _reset_caches() yield temp_base_dir @pytest.fixture def mock_semantic_context( self, semantic_base_dir: Path, monkeypatch: pytest.MonkeyPatch ): """Create and set up a mock semantic context.""" from frontmatter_mcp.semantic import EmbeddingCache, EmbeddingIndexer from frontmatter_mcp.semantic.context import SemanticContext mock_model = MagicMock() mock_model.name = "test-model" mock_model.get_dimension.return_value = 256 mock_model.encode.return_value = np.random.rand(256).astype(np.float32) # Create real cache and indexer with mock model cache = EmbeddingCache( cache_dir=semantic_base_dir / ".cache", model=mock_model, ) def get_files() -> list: return list(semantic_base_dir.rglob("*.md")) indexer = EmbeddingIndexer(cache, mock_model, get_files, semantic_base_dir) sem_ctx = SemanticContext(model=mock_model, cache=cache, indexer=indexer) # Set the semantic context cache in dependencies module deps._semantic_ctx_cache = sem_ctx return sem_ctx def test_index_status_enabled( self, semantic_base_dir: Path, mock_semantic_context ) -> None: """index_status returns state when enabled.""" result = server_module.index_status.fn(**_semantic_dep()) assert "state" in result assert result["state"] in ["idle", "indexing", "ready"] def test_index_wait_success( self, semantic_base_dir: Path, mock_semantic_context ) -> None: """index_wait returns success=true when indexing completes.""" server_module.index_refresh.fn(**_semantic_dep()) result = server_module.index_wait.fn(timeout=5.0, **_semantic_dep()) assert result["success"] is True assert result["state"] == "ready" def test_index_wait_idle( self, semantic_base_dir: Path, mock_semantic_context ) -> None: """index_wait returns success=true immediately when idle.""" result = server_module.index_wait.fn(timeout=0.1, **_semantic_dep()) # When idle (never started), wait returns immediately with success=true assert result["success"] is True assert result["state"] == "idle" def test_index_refresh_enabled( self, semantic_base_dir: Path, mock_semantic_context ) -> None: """index_refresh starts indexing when enabled.""" result = server_module.index_refresh.fn(**_semantic_dep()) assert "message" in result assert result["message"] in ["Indexing started", "Indexing already in progress"] mock_semantic_context.indexer.wait(timeout=5.0) def test_query_with_semantic_search( self, semantic_base_dir: Path, mock_semantic_context ) -> None: """query can use embed() function after indexing.""" server_module.index_refresh.fn(**_semantic_dep()) mock_semantic_context.indexer.wait(timeout=5.0) result = server_module.query.fn( glob="**/*.md", sql="""SELECT path, array_cosine_similarity(embedding, embed('test')) as score FROM files ORDER BY score DESC LIMIT 2""", **_query_deps(), ) assert result["row_count"] == 2 assert "score" in result["columns"] def test_query_inspect_includes_embedding( self, semantic_base_dir: Path, mock_semantic_context ) -> None: """query_inspect includes embedding in schema when semantic search ready.""" server_module.index_refresh.fn(**_semantic_dep()) mock_semantic_context.indexer.wait(timeout=5.0) result = server_module.query_inspect.fn(glob="**/*.md", **_query_deps()) assert "embedding" in result["schema"] assert result["schema"]["embedding"]["type"] == "FLOAT[256]" assert result["schema"]["embedding"]["nullable"] is False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kzmshx/frontmatter-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server