Skip to main content
Glama
kzmshx
by kzmshx
test_query.py12.3 kB
"""Tests for DuckDB query module.""" from datetime import date from typing import Any from unittest.mock import MagicMock import numpy as np from frontmatter_mcp.query import create_base_connection, execute_query from frontmatter_mcp.semantic import add_semantic_columns from frontmatter_mcp.semantic.context import SemanticContext def create_mock_semantic_context( embeddings: dict[str, np.ndarray[Any, Any]], model: MagicMock, ) -> SemanticContext: """Create a mock SemanticContext for testing.""" mock_cache = MagicMock() mock_cache.get_all_readonly.return_value = embeddings mock_indexer = MagicMock() return SemanticContext(model=model, cache=mock_cache, indexer=mock_indexer) def _execute_query_simple(records: list[dict[str, Any]], sql: str) -> dict[str, Any]: """Helper to execute query with new API (no semantic).""" conn = create_base_connection(records) return execute_query(conn, sql) class TestExecuteQuery: """Tests for execute_query function.""" def test_select_all(self) -> None: """Select all columns from records.""" records = [ {"path": "a.md", "title": "Title A"}, {"path": "b.md", "title": "Title B"}, ] result = _execute_query_simple(records, "SELECT * FROM files") assert result["row_count"] == 2 assert "path" in result["columns"] assert "title" in result["columns"] def test_select_specific_columns(self) -> None: """Select specific columns.""" records = [ {"path": "a.md", "title": "Title A", "date": date(2025, 11, 27)}, {"path": "b.md", "title": "Title B", "date": date(2025, 11, 26)}, ] result = _execute_query_simple(records, "SELECT path, title FROM files") assert result["columns"] == ["path", "title"] assert len(result["results"]) == 2 def test_where_clause(self) -> None: """Filter records with WHERE clause.""" records = [ {"path": "a.md", "date": date(2025, 11, 27)}, {"path": "b.md", "date": date(2025, 11, 26)}, {"path": "c.md", "date": date(2025, 11, 25)}, ] result = _execute_query_simple( records, "SELECT path FROM files WHERE date >= '2025-11-26'" ) assert result["row_count"] == 2 paths = [r["path"] for r in result["results"]] assert "a.md" in paths assert "b.md" in paths assert "c.md" not in paths def test_order_by(self) -> None: """Order results.""" records = [ {"path": "b.md", "date": date(2025, 11, 26)}, {"path": "a.md", "date": date(2025, 11, 27)}, ] result = _execute_query_simple( records, "SELECT path FROM files ORDER BY date DESC" ) assert result["results"][0]["path"] == "a.md" assert result["results"][1]["path"] == "b.md" def test_array_contains(self) -> None: """Filter by array containment using from_json.""" records = [ {"path": "a.md", "tags": ["mcp", "python"]}, {"path": "b.md", "tags": ["duckdb"]}, {"path": "c.md", "tags": ["mcp", "duckdb"]}, ] # Arrays are JSON-encoded strings, use from_json to parse result = _execute_query_simple( records, """SELECT path FROM files WHERE list_contains(from_json(tags, '["VARCHAR"]'), 'mcp')""", ) assert result["row_count"] == 2 paths = [r["path"] for r in result["results"]] assert "a.md" in paths assert "c.md" in paths def test_aggregate_count(self) -> None: """Count records.""" records = [ {"path": "a.md"}, {"path": "b.md"}, {"path": "c.md"}, ] result = _execute_query_simple(records, "SELECT COUNT(*) as count FROM files") assert result["row_count"] == 1 assert result["results"][0]["count"] == 3 def test_unnest_tags(self) -> None: """Unnest array and aggregate using from_json.""" records = [ {"path": "a.md", "tags": ["mcp", "python"]}, {"path": "b.md", "tags": ["mcp"]}, ] # Arrays are JSON-encoded strings, use from_json then unnest result = _execute_query_simple( records, """ SELECT tag, COUNT(*) AS count FROM files, UNNEST(from_json(tags, '["VARCHAR"]')) AS t(tag) GROUP BY tag ORDER BY count DESC """, ) assert result["row_count"] == 2 assert result["results"][0]["tag"] == "mcp" assert result["results"][0]["count"] == 2 def test_empty_records(self) -> None: """Handle empty records list.""" result = _execute_query_simple([], "SELECT * FROM files") assert result["row_count"] == 0 assert result["results"] == [] def test_null_handling(self) -> None: """Handle NULL values in records.""" records = [ {"path": "a.md", "summary": "Has summary"}, {"path": "b.md", "summary": None}, {"path": "c.md"}, # No summary key at all ] result = _execute_query_simple( records, "SELECT path FROM files WHERE summary IS NULL" ) paths = [r["path"] for r in result["results"]] assert "b.md" in paths assert "c.md" in paths assert "a.md" not in paths def test_templater_expressions_mixed_with_dates(self) -> None: """Handle Templater expressions mixed with real dates. This tests the scenario where template files contain unexpanded Templater expressions like '<% tp.date.now("YYYY-MM-DD") %>' alongside real date values from normal files. The key assertion is that no type error occurs - all values are treated as strings and the query executes successfully. """ records = [ {"path": "a.md", "date": date(2025, 11, 27)}, {"path": "b.md", "date": date(2025, 11, 26)}, # Template file with unexpanded Templater expression {"path": "template.md", "date": '<% tp.date.now("YYYY-MM-DD") %>'}, ] # Query should not error - all values are strings now # Note: String comparison means '<% ...' sorts after '2025-...' result = _execute_query_simple(records, "SELECT path, date FROM files") # All 3 records are returned without type errors assert result["row_count"] == 3 # To filter out templates, use LIKE or explicit date patterns result_filtered = _execute_query_simple( records, "SELECT path FROM files WHERE date LIKE '2025-%' AND date >= '2025-11-26'", ) assert result_filtered["row_count"] == 2 paths = [r["path"] for r in result_filtered["results"]] assert "a.md" in paths assert "b.md" in paths assert "template.md" not in paths def test_mixed_type_values_in_same_column(self) -> None: """Handle mixed types in the same column. All values are serialized to strings, so queries work regardless of the original Python type. """ records = [ {"path": "a.md", "value": "string"}, {"path": "b.md", "value": 42}, {"path": "c.md", "value": 3.14}, {"path": "d.md", "value": True}, {"path": "e.md", "value": ["a", "b"]}, ] result = _execute_query_simple(records, "SELECT path, value FROM files") assert result["row_count"] == 5 # All values are strings values = {r["path"]: r["value"] for r in result["results"]} assert values["a.md"] == "string" assert values["b.md"] == "42" assert values["c.md"] == "3.14" assert values["d.md"] == "True" assert values["e.md"] == '["a", "b"]' class TestSemanticSearch: """Tests for semantic search integration.""" def test_query_without_embeddings(self) -> None: """Query works without embeddings (backward compatibility).""" records = [{"path": "a.md", "title": "A"}] result = _execute_query_simple(records, "SELECT * FROM files") assert result["row_count"] == 1 def test_query_with_embedding_column(self) -> None: """Query includes embedding column when embeddings provided.""" records = [{"path": "a.md", "title": "A"}] embeddings = {"a.md": np.random.rand(256).astype(np.float32)} mock_model = MagicMock() mock_model.get_dimension.return_value = 256 mock_model.encode.return_value = np.random.rand(256).astype(np.float32) semantic = create_mock_semantic_context(embeddings, mock_model) conn = create_base_connection(records) add_semantic_columns(conn, semantic) result = execute_query(conn, "SELECT path, embedding FROM files") assert result["row_count"] == 1 assert "embedding" in result["columns"] assert result["results"][0]["embedding"] is not None def test_embed_function_registered(self) -> None: """embed() function can be used in SQL.""" records = [{"path": "a.md", "title": "A"}] embeddings = {"a.md": np.random.rand(256).astype(np.float32)} mock_model = MagicMock() mock_model.get_dimension.return_value = 256 mock_model.encode.return_value = np.random.rand(256).astype(np.float32) semantic = create_mock_semantic_context(embeddings, mock_model) conn = create_base_connection(records) add_semantic_columns(conn, semantic) result = execute_query( conn, "SELECT path, embed('test query') as query_vec FROM files" ) assert result["row_count"] == 1 assert "query_vec" in result["columns"] mock_model.encode.assert_called_with("test query") def test_cosine_similarity_calculation(self) -> None: """array_cosine_similarity works with embeddings.""" # Create embeddings where a.md is more similar to query than b.md query_vec = np.array([1.0, 0.0, 0.0] + [0.0] * 253, dtype=np.float32) vec_a = np.array([0.9, 0.1, 0.0] + [0.0] * 253, dtype=np.float32) # Similar vec_b = np.array([0.0, 1.0, 0.0] + [0.0] * 253, dtype=np.float32) # Different records = [ {"path": "a.md", "title": "A"}, {"path": "b.md", "title": "B"}, ] embeddings = {"a.md": vec_a, "b.md": vec_b} mock_model = MagicMock() mock_model.get_dimension.return_value = 256 mock_model.encode.return_value = query_vec semantic = create_mock_semantic_context(embeddings, mock_model) conn = create_base_connection(records) add_semantic_columns(conn, semantic) result = execute_query( conn, """ SELECT path, array_cosine_similarity(embedding, embed('query')) as score FROM files ORDER BY score DESC """, ) assert result["row_count"] == 2 # a.md should have higher score (more similar) assert result["results"][0]["path"] == "a.md" assert result["results"][0]["score"] > result["results"][1]["score"] def test_embedding_null_for_missing_path(self) -> None: """Embedding is NULL for paths not in embeddings dict.""" records = [ {"path": "a.md", "title": "A"}, {"path": "b.md", "title": "B"}, # No embedding for this ] embeddings = {"a.md": np.random.rand(256).astype(np.float32)} mock_model = MagicMock() mock_model.get_dimension.return_value = 256 semantic = create_mock_semantic_context(embeddings, mock_model) conn = create_base_connection(records) add_semantic_columns(conn, semantic) result = execute_query(conn, "SELECT path, embedding FROM files ORDER BY path") assert result["row_count"] == 2 assert result["results"][0]["path"] == "a.md" assert result["results"][0]["embedding"] is not None assert result["results"][1]["path"] == "b.md" assert result["results"][1]["embedding"] is None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kzmshx/frontmatter-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server