Skip to main content
Glama

MCP Memory Service

""" Comprehensive tests for SQLite-vec storage backend. """ import pytest import asyncio import tempfile import os import shutil import json from unittest.mock import Mock, patch import time # Skip tests if sqlite-vec is not available try: import sqlite_vec SQLITE_VEC_AVAILABLE = True except ImportError: SQLITE_VEC_AVAILABLE = False from src.mcp_memory_service.models.memory import Memory, MemoryQueryResult from src.mcp_memory_service.utils.hashing import generate_content_hash if SQLITE_VEC_AVAILABLE: from src.mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage # Skip all tests if sqlite-vec is not available pytestmark = pytest.mark.skipif(not SQLITE_VEC_AVAILABLE, reason="sqlite-vec not available") class TestSqliteVecStorage: """Test suite for SQLite-vec storage functionality.""" @pytest.fixture async def storage(self): """Create a test storage instance.""" temp_dir = tempfile.mkdtemp() db_path = os.path.join(temp_dir, "test_memory.db") storage = SqliteVecMemoryStorage(db_path) await storage.initialize() yield storage # Cleanup if storage.conn: storage.conn.close() shutil.rmtree(temp_dir, ignore_errors=True) @pytest.fixture def sample_memory(self): """Create a sample memory for testing.""" content = "This is a test memory for SQLite-vec storage" return Memory( content=content, content_hash=generate_content_hash(content), tags=["test", "sqlite-vec"], memory_type="note", metadata={"priority": "medium", "category": "testing"} ) @pytest.mark.asyncio async def test_initialization(self): """Test storage initialization.""" temp_dir = tempfile.mkdtemp() db_path = os.path.join(temp_dir, "test_init.db") try: storage = SqliteVecMemoryStorage(db_path) await storage.initialize() # Check that database file was created assert os.path.exists(db_path) # Check that connection is established assert storage.conn is not None # Check that table was created cursor = storage.conn.execute(''' SELECT name FROM sqlite_master WHERE type='table' AND name='memories' ''') assert cursor.fetchone() is not None storage.close() finally: shutil.rmtree(temp_dir, ignore_errors=True) @pytest.mark.asyncio async def test_store_memory(self, storage, sample_memory): """Test storing a memory.""" success, message = await storage.store(sample_memory) assert success assert "successfully" in message.lower() # Verify memory was stored cursor = storage.conn.execute( 'SELECT content_hash FROM memories WHERE content_hash = ?', (sample_memory.content_hash,) ) result = cursor.fetchone() assert result is not None assert result[0] == sample_memory.content_hash @pytest.mark.asyncio async def test_store_duplicate_memory(self, storage, sample_memory): """Test that duplicate memories are rejected.""" # Store the memory first time success, message = await storage.store(sample_memory) assert success # Try to store the same memory again success, message = await storage.store(sample_memory) assert not success assert "duplicate" in message.lower() @pytest.mark.asyncio async def test_retrieve_memory(self, storage, sample_memory): """Test retrieving memories using semantic search.""" # Store the memory await storage.store(sample_memory) # Retrieve using semantic search results = await storage.retrieve("test memory sqlite", n_results=5) assert len(results) > 0 assert isinstance(results[0], MemoryQueryResult) assert results[0].memory.content_hash == sample_memory.content_hash assert results[0].relevance_score >= 0.0 assert results[0].debug_info["backend"] == "sqlite-vec" @pytest.mark.asyncio async def test_retrieve_no_results(self, storage): """Test retrieving when no memories match.""" results = await storage.retrieve("nonexistent query", n_results=5) assert len(results) == 0 @pytest.mark.asyncio async def test_search_by_tag(self, storage, sample_memory): """Test searching memories by tags.""" # Store the memory await storage.store(sample_memory) # Search by existing tag results = await storage.search_by_tag(["test"]) assert len(results) == 1 assert results[0].content_hash == sample_memory.content_hash # Search by non-existent tag results = await storage.search_by_tag(["nonexistent"]) assert len(results) == 0 # Search by multiple tags results = await storage.search_by_tag(["test", "sqlite-vec"]) assert len(results) == 1 @pytest.mark.asyncio async def test_search_by_empty_tags(self, storage): """Test searching with empty tags list.""" results = await storage.search_by_tag([]) assert len(results) == 0 @pytest.mark.asyncio async def test_delete_memory(self, storage, sample_memory): """Test deleting a memory by content hash.""" # Store the memory await storage.store(sample_memory) # Delete the memory success, message = await storage.delete(sample_memory.content_hash) assert success assert sample_memory.content_hash in message # Verify memory was deleted cursor = storage.conn.execute( 'SELECT content_hash FROM memories WHERE content_hash = ?', (sample_memory.content_hash,) ) assert cursor.fetchone() is None @pytest.mark.asyncio async def test_delete_nonexistent_memory(self, storage): """Test deleting a non-existent memory.""" nonexistent_hash = "nonexistent123456789" success, message = await storage.delete(nonexistent_hash) assert not success assert "not found" in message.lower() @pytest.mark.asyncio async def test_delete_by_tag(self, storage): """Test deleting memories by tag.""" # Store multiple memories with different tags memory1 = Memory( content="Memory 1", content_hash=generate_content_hash("Memory 1"), tags=["tag1", "shared"] ) memory2 = Memory( content="Memory 2", content_hash=generate_content_hash("Memory 2"), tags=["tag2", "shared"] ) memory3 = Memory( content="Memory 3", content_hash=generate_content_hash("Memory 3"), tags=["tag3"] ) await storage.store(memory1) await storage.store(memory2) await storage.store(memory3) # Delete by shared tag count, message = await storage.delete_by_tag("shared") assert count == 2 assert "deleted 2 memories" in message.lower() # Verify correct memories were deleted remaining = await storage.search_by_tag(["tag3"]) assert len(remaining) == 1 assert remaining[0].content_hash == memory3.content_hash @pytest.mark.asyncio async def test_delete_by_nonexistent_tag(self, storage): """Test deleting by a non-existent tag.""" count, message = await storage.delete_by_tag("nonexistent") assert count == 0 assert "no memories found" in message.lower() @pytest.mark.asyncio async def test_cleanup_duplicates(self, storage): """Test cleaning up duplicate memories.""" # Create memory content = "Duplicate test memory" memory = Memory( content=content, content_hash=generate_content_hash(content), tags=["duplicate"] ) # Store the memory await storage.store(memory) # Manually insert a duplicate (bypassing duplicate check) embedding = storage._generate_embedding(content) storage.conn.execute(''' INSERT INTO memories ( content_embedding, content_hash, content, tags, memory_type, metadata, created_at, updated_at, created_at_iso, updated_at_iso ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( sqlite_vec.serialize_float32(embedding), memory.content_hash, content, "duplicate", None, "{}", time.time(), time.time(), "2024-01-01T00:00:00Z", "2024-01-01T00:00:00Z" )) storage.conn.commit() # Clean up duplicates count, message = await storage.cleanup_duplicates() assert count == 1 assert "removed 1 duplicate" in message.lower() # Verify only one copy remains cursor = storage.conn.execute( 'SELECT COUNT(*) FROM memories WHERE content_hash = ?', (memory.content_hash,) ) assert cursor.fetchone()[0] == 1 @pytest.mark.asyncio async def test_cleanup_no_duplicates(self, storage, sample_memory): """Test cleanup when no duplicates exist.""" await storage.store(sample_memory) count, message = await storage.cleanup_duplicates() assert count == 0 assert "no duplicate memories found" in message.lower() @pytest.mark.asyncio async def test_update_memory_metadata(self, storage, sample_memory): """Test updating memory metadata.""" # Store the memory await storage.store(sample_memory) # Update metadata updates = { "tags": ["updated", "test"], "memory_type": "reminder", "metadata": {"priority": "high", "due_date": "2024-01-15"}, "status": "active" } success, message = await storage.update_memory_metadata( content_hash=sample_memory.content_hash, updates=updates ) assert success assert "updated fields" in message.lower() # Verify updates cursor = storage.conn.execute(''' SELECT tags, memory_type, metadata FROM memories WHERE content_hash = ? ''', (sample_memory.content_hash,)) row = cursor.fetchone() assert row is not None tags_str, memory_type, metadata_str = row metadata = json.loads(metadata_str) assert tags_str == "updated,test" assert memory_type == "reminder" assert metadata["priority"] == "high" assert metadata["due_date"] == "2024-01-15" assert metadata["status"] == "active" @pytest.mark.asyncio async def test_update_nonexistent_memory(self, storage): """Test updating metadata for non-existent memory.""" nonexistent_hash = "nonexistent123456789" success, message = await storage.update_memory_metadata( content_hash=nonexistent_hash, updates={"tags": ["test"]} ) assert not success assert "not found" in message.lower() @pytest.mark.asyncio async def test_update_memory_with_invalid_tags(self, storage, sample_memory): """Test updating memory with invalid tags format.""" await storage.store(sample_memory) success, message = await storage.update_memory_metadata( content_hash=sample_memory.content_hash, updates={"tags": "not_a_list"} ) assert not success assert "list of strings" in message.lower() @pytest.mark.asyncio async def test_update_memory_with_invalid_metadata(self, storage, sample_memory): """Test updating memory with invalid metadata format.""" await storage.store(sample_memory) success, message = await storage.update_memory_metadata( content_hash=sample_memory.content_hash, updates={"metadata": "not_a_dict"} ) assert not success assert "dictionary" in message.lower() @pytest.mark.asyncio async def test_update_memory_preserve_timestamps(self, storage, sample_memory): """Test updating memory while preserving timestamps.""" await storage.store(sample_memory) # Get original timestamps cursor = storage.conn.execute(''' SELECT created_at, created_at_iso FROM memories WHERE content_hash = ? ''', (sample_memory.content_hash,)) original_created_at, original_created_at_iso = cursor.fetchone() # Wait a moment await asyncio.sleep(0.1) # Update with preserve_timestamps=True success, message = await storage.update_memory_metadata( content_hash=sample_memory.content_hash, updates={"tags": ["updated"]}, preserve_timestamps=True ) assert success # Check timestamps cursor = storage.conn.execute(''' SELECT created_at, created_at_iso, updated_at FROM memories WHERE content_hash = ? ''', (sample_memory.content_hash,)) created_at, created_at_iso, updated_at = cursor.fetchone() # created_at should be preserved assert abs(created_at - original_created_at) < 0.01 assert created_at_iso == original_created_at_iso # updated_at should be newer assert updated_at > original_created_at @pytest.mark.asyncio async def test_update_memory_reset_timestamps(self, storage, sample_memory): """Test updating memory with timestamp reset.""" await storage.store(sample_memory) # Get original timestamps cursor = storage.conn.execute(''' SELECT created_at FROM memories WHERE content_hash = ? ''', (sample_memory.content_hash,)) original_created_at = cursor.fetchone()[0] # Wait a moment await asyncio.sleep(0.1) # Update with preserve_timestamps=False success, message = await storage.update_memory_metadata( content_hash=sample_memory.content_hash, updates={"tags": ["updated"]}, preserve_timestamps=False ) assert success # Check timestamps cursor = storage.conn.execute(''' SELECT created_at FROM memories WHERE content_hash = ? ''', (sample_memory.content_hash,)) created_at = cursor.fetchone()[0] # created_at should be updated (newer) assert created_at > original_created_at def test_get_stats(self, storage): """Test getting storage statistics.""" stats = storage.get_stats() assert isinstance(stats, dict) assert stats["backend"] == "sqlite-vec" assert "total_memories" in stats assert "database_size_bytes" in stats assert "embedding_model" in stats assert "embedding_dimension" in stats @pytest.mark.asyncio async def test_get_stats_with_data(self, storage, sample_memory): """Test getting statistics with data.""" await storage.store(sample_memory) stats = storage.get_stats() assert stats["total_memories"] >= 1 assert stats["database_size_bytes"] > 0 assert stats["embedding_dimension"] == storage.embedding_dimension def test_close_connection(self, storage): """Test closing the database connection.""" assert storage.conn is not None storage.close() assert storage.conn is None @pytest.mark.asyncio async def test_multiple_memories_retrieval(self, storage): """Test retrieving multiple memories with ranking.""" # Store multiple memories memories = [] for i in range(5): content = f"Test memory {i} with different content and keywords" memory = Memory( content=content, content_hash=generate_content_hash(content), tags=[f"tag{i}"], memory_type="note" ) memories.append(memory) await storage.store(memory) # Retrieve memories results = await storage.retrieve("test memory content", n_results=3) assert len(results) <= 3 assert len(results) > 0 # Check that results are properly ranked (higher relevance first) for i in range(len(results) - 1): assert results[i].relevance_score >= results[i + 1].relevance_score @pytest.mark.asyncio async def test_embedding_generation(self, storage): """Test embedding generation functionality.""" test_text = "This is a test for embedding generation" embedding = storage._generate_embedding(test_text) assert isinstance(embedding, list) assert len(embedding) == storage.embedding_dimension assert all(isinstance(x, float) for x in embedding) @pytest.mark.asyncio async def test_memory_with_complex_metadata(self, storage): """Test storing and retrieving memory with complex metadata.""" content = "Memory with complex metadata" memory = Memory( content=content, content_hash=generate_content_hash(content), tags=["complex", "metadata", "test"], memory_type="structured", metadata={ "nested": {"level1": {"level2": "value"}}, "array": [1, 2, 3, "four"], "boolean": True, "null_value": None, "unicode": "测试中文 🚀" } ) # Store the memory success, message = await storage.store(memory) assert success # Retrieve and verify results = await storage.retrieve("complex metadata", n_results=1) assert len(results) == 1 retrieved_memory = results[0].memory assert retrieved_memory.metadata["nested"]["level1"]["level2"] == "value" assert retrieved_memory.metadata["array"] == [1, 2, 3, "four"] assert retrieved_memory.metadata["boolean"] is True assert retrieved_memory.metadata["unicode"] == "测试中文 🚀" @pytest.mark.asyncio async def test_concurrent_operations(self, storage): """Test concurrent storage operations.""" # Create multiple memories memories = [] for i in range(10): content = f"Concurrent test memory {i}" memory = Memory( content=content, content_hash=generate_content_hash(content), tags=[f"concurrent{i}"] ) memories.append(memory) # Store memories concurrently tasks = [storage.store(memory) for memory in memories] results = await asyncio.gather(*tasks) # All should succeed assert all(success for success, _ in results) # Verify all were stored for memory in memories: cursor = storage.conn.execute( 'SELECT content_hash FROM memories WHERE content_hash = ?', (memory.content_hash,) ) assert cursor.fetchone() is not None class TestSqliteVecStorageWithoutEmbeddings: """Test SQLite-vec storage when sentence transformers is not available.""" @pytest.mark.asyncio async def test_initialization_without_embeddings(self): """Test that storage can initialize without sentence transformers.""" temp_dir = tempfile.mkdtemp() db_path = os.path.join(temp_dir, "test_no_embeddings.db") try: with patch('src.mcp_memory_service.storage.sqlite_vec.SENTENCE_TRANSFORMERS_AVAILABLE', False): storage = SqliteVecMemoryStorage(db_path) await storage.initialize() assert storage.conn is not None assert storage.embedding_model is None storage.close() finally: shutil.rmtree(temp_dir, ignore_errors=True) @pytest.mark.asyncio async def test_operations_without_embeddings(self): """Test basic operations without embeddings.""" temp_dir = tempfile.mkdtemp() db_path = os.path.join(temp_dir, "test_no_embeddings.db") try: with patch('src.mcp_memory_service.storage.sqlite_vec.SENTENCE_TRANSFORMERS_AVAILABLE', False): storage = SqliteVecMemoryStorage(db_path) await storage.initialize() # Store should work (with zero embeddings) content = "Test without embeddings" memory = Memory( content=content, content_hash=generate_content_hash(content), tags=["no-embeddings"] ) success, message = await storage.store(memory) assert success # Tag search should work results = await storage.search_by_tag(["no-embeddings"]) assert len(results) == 1 # Semantic search won't work well but shouldn't crash results = await storage.retrieve("test", n_results=1) # May or may not return results, but shouldn't crash storage.close() finally: shutil.rmtree(temp_dir, ignore_errors=True) if __name__ == "__main__": # Run basic tests when executed directly async def run_basic_tests(): """Run basic tests to verify functionality.""" if not SQLITE_VEC_AVAILABLE: print("⚠️ sqlite-vec not available, skipping tests") return print("Running basic SQLite-vec storage tests...") temp_dir = tempfile.mkdtemp() db_path = os.path.join(temp_dir, "test_basic.db") try: # Test basic functionality storage = SqliteVecMemoryStorage(db_path) await storage.initialize() # Store a memory content = "Test memory for basic validation" memory = Memory( content=content, content_hash=generate_content_hash(content), tags=["test", "basic"] ) success, message = await storage.store(memory) print(f"Store: {success}, {message}") # Retrieve the memory results = await storage.retrieve("test memory", n_results=1) print(f"Retrieve: Found {len(results)} results") if results: print(f"Content: {results[0].memory.content}") print(f"Relevance: {results[0].relevance_score}") # Search by tag tag_results = await storage.search_by_tag(["test"]) print(f"Tag search: Found {len(tag_results)} results") # Get stats stats = storage.get_stats() print(f"Stats: {stats['total_memories']} memories, {stats['database_size_mb']} MB") storage.close() print("✅ Basic tests passed!") except Exception as e: print(f"❌ Basic tests failed: {e}") finally: shutil.rmtree(temp_dir, ignore_errors=True) # Run the basic tests asyncio.run(run_basic_tests())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server