Skip to main content
Glama

MCP Memory Service

"""Unit tests for the semantic clustering engine.""" import pytest import numpy as np from datetime import datetime, timedelta from mcp_memory_service.consolidation.clustering import SemanticClusteringEngine from mcp_memory_service.consolidation.base import MemoryCluster from mcp_memory_service.models.memory import Memory @pytest.mark.unit class TestSemanticClusteringEngine: """Test the semantic clustering system.""" @pytest.fixture def clustering_engine(self, consolidation_config): return SemanticClusteringEngine(consolidation_config) @pytest.mark.asyncio async def test_basic_clustering(self, clustering_engine, sample_memories): """Test basic clustering functionality.""" # Use memories with embeddings memories_with_embeddings = [m for m in sample_memories if m.embedding] clusters = await clustering_engine.process(memories_with_embeddings) assert isinstance(clusters, list) assert all(isinstance(cluster, MemoryCluster) for cluster in clusters) for cluster in clusters: assert len(cluster.memory_hashes) >= clustering_engine.min_cluster_size assert isinstance(cluster.cluster_id, str) assert isinstance(cluster.centroid_embedding, list) assert len(cluster.centroid_embedding) > 0 assert 0 <= cluster.coherence_score <= 1 assert isinstance(cluster.created_at, datetime) assert isinstance(cluster.theme_keywords, list) @pytest.mark.asyncio async def test_clustering_with_similar_embeddings(self, clustering_engine): """Test clustering with known similar embeddings.""" base_time = datetime.now().timestamp() # Create memories with similar embeddings (should cluster together) similar_memories = [] base_embedding = [0.5, 0.4, 0.6, 0.3, 0.7] for i in range(6): # Create enough for min_cluster_size # Add small variations to base embedding embedding = [] for val in base_embedding * 64: # 320-dim noise = np.random.normal(0, 0.05) # Small noise embedding.append(max(0, min(1, val + noise))) memory = Memory( content=f"Similar content about programming topic {i}", content_hash=f"similar_{i}", tags=["programming", "similar"], embedding=embedding, created_at=base_time - (i * 3600) ) similar_memories.append(memory) # Add some different memories for i in range(3): different_embedding = [0.1, 0.9, 0.2, 0.8, 0.1] * 64 memory = Memory( content=f"Different content about weather {i}", content_hash=f"different_{i}", tags=["weather", "different"], embedding=different_embedding, created_at=base_time - (i * 3600) ) similar_memories.append(memory) clusters = await clustering_engine.process(similar_memories) # Should find at least one cluster assert len(clusters) >= 1 # Check that similar memories are clustered together if clusters: # Find cluster with similar memories similar_cluster = None for cluster in clusters: if any("similar" in hash_id for hash_id in cluster.memory_hashes): similar_cluster = cluster break if similar_cluster: # Should contain multiple similar memories similar_hashes = [h for h in similar_cluster.memory_hashes if "similar" in h] assert len(similar_hashes) >= clustering_engine.min_cluster_size @pytest.mark.asyncio async def test_insufficient_memories(self, clustering_engine): """Test handling of insufficient memories for clustering.""" # Create too few memories few_memories = [] for i in range(2): # Less than min_cluster_size memory = Memory( content=f"Content {i}", content_hash=f"hash_{i}", tags=["test"], embedding=[0.5] * 320, created_at=datetime.now().timestamp() ) few_memories.append(memory) clusters = await clustering_engine.process(few_memories) assert clusters == [] @pytest.mark.asyncio async def test_memories_without_embeddings(self, clustering_engine): """Test handling of memories without embeddings.""" memories_no_embeddings = [] for i in range(5): memory = Memory( content=f"Content {i}", content_hash=f"hash_{i}", tags=["test"], embedding=None, # No embedding created_at=datetime.now().timestamp() ) memories_no_embeddings.append(memory) clusters = await clustering_engine.process(memories_no_embeddings) assert clusters == [] @pytest.mark.asyncio async def test_theme_keyword_extraction(self, clustering_engine): """Test extraction of theme keywords from clusters.""" # Create memories with common themes themed_memories = [] base_embedding = [0.5, 0.5, 0.5, 0.5, 0.5] * 64 for i in range(5): memory = Memory( content=f"Python programming tutorial {i} about functions and classes", content_hash=f"python_{i}", tags=["python", "programming", "tutorial"], embedding=[val + np.random.normal(0, 0.02) for val in base_embedding], created_at=datetime.now().timestamp() ) themed_memories.append(memory) clusters = await clustering_engine.process(themed_memories) if clusters: cluster = clusters[0] # Should extract relevant theme keywords assert len(cluster.theme_keywords) > 0 # Should include frequent tags common_tags = {"python", "programming", "tutorial"} found_tags = set(cluster.theme_keywords).intersection(common_tags) assert len(found_tags) > 0 @pytest.mark.asyncio async def test_cluster_metadata(self, clustering_engine, sample_memories): """Test that cluster metadata is properly populated.""" memories_with_embeddings = [m for m in sample_memories if m.embedding] if len(memories_with_embeddings) >= clustering_engine.min_cluster_size: clusters = await clustering_engine.process(memories_with_embeddings) for cluster in clusters: assert 'algorithm' in cluster.metadata assert 'cluster_size' in cluster.metadata assert 'average_memory_age' in cluster.metadata assert 'tag_distribution' in cluster.metadata assert cluster.metadata['cluster_size'] == len(cluster.memory_hashes) assert isinstance(cluster.metadata['average_memory_age'], float) assert isinstance(cluster.metadata['tag_distribution'], dict) @pytest.mark.asyncio async def test_simple_clustering_fallback(self, clustering_engine): """Test simple clustering algorithm fallback.""" # Force simple clustering algorithm original_algorithm = clustering_engine.algorithm clustering_engine.algorithm = 'simple' try: # Create memories with known similarity patterns similar_memories = [] # Group 1: High similarity base1 = [0.8, 0.7, 0.9, 0.8, 0.7] * 64 for i in range(4): embedding = [val + np.random.normal(0, 0.01) for val in base1] memory = Memory( content=f"Group 1 content {i}", content_hash=f"group1_{i}", tags=["group1"], embedding=embedding, created_at=datetime.now().timestamp() ) similar_memories.append(memory) # Group 2: Different but internally similar base2 = [0.2, 0.3, 0.1, 0.2, 0.3] * 64 for i in range(4): embedding = [val + np.random.normal(0, 0.01) for val in base2] memory = Memory( content=f"Group 2 content {i}", content_hash=f"group2_{i}", tags=["group2"], embedding=embedding, created_at=datetime.now().timestamp() ) similar_memories.append(memory) clusters = await clustering_engine.process(similar_memories) # Simple algorithm should still find clusters assert isinstance(clusters, list) finally: clustering_engine.algorithm = original_algorithm @pytest.mark.asyncio async def test_merge_similar_clusters(self, clustering_engine): """Test merging of similar clusters.""" # Create two similar clusters cluster1 = MemoryCluster( cluster_id="cluster1", memory_hashes=["hash1", "hash2"], centroid_embedding=[0.5, 0.5, 0.5] * 107, # ~320 dim coherence_score=0.8, created_at=datetime.now(), theme_keywords=["python", "programming"] ) cluster2 = MemoryCluster( cluster_id="cluster2", memory_hashes=["hash3", "hash4"], centroid_embedding=[0.52, 0.48, 0.51] * 107, # Similar to cluster1 coherence_score=0.7, created_at=datetime.now(), theme_keywords=["python", "coding"] ) # Very different cluster cluster3 = MemoryCluster( cluster_id="cluster3", memory_hashes=["hash5", "hash6"], centroid_embedding=[0.1, 0.9, 0.1] * 107, # Very different coherence_score=0.6, created_at=datetime.now(), theme_keywords=["weather", "forecast"] ) clusters = [cluster1, cluster2, cluster3] merged_clusters = await clustering_engine.merge_similar_clusters( clusters, similarity_threshold=0.9 ) # Should merge similar clusters assert len(merged_clusters) <= len(clusters) # Check that merged cluster contains memories from both original clusters if len(merged_clusters) < len(clusters): # Find the merged cluster (should have more memories) merged = max(merged_clusters, key=lambda c: len(c.memory_hashes)) assert len(merged.memory_hashes) >= 4 # Combined from cluster1 and cluster2 @pytest.mark.asyncio async def test_coherence_score_calculation(self, clustering_engine): """Test coherence score calculation for clusters.""" # Create tightly clustered memories tight_memories = [] base_embedding = [0.5, 0.5, 0.5, 0.5, 0.5] * 64 for i in range(5): # Very similar embeddings (high coherence) embedding = [val + np.random.normal(0, 0.01) for val in base_embedding] memory = Memory( content=f"Tight cluster content {i}", content_hash=f"tight_{i}", tags=["tight"], embedding=embedding, created_at=datetime.now().timestamp() ) tight_memories.append(memory) # Create loosely clustered memories loose_memories = [] for i in range(5): # More varied embeddings (lower coherence) embedding = [val + np.random.normal(0, 0.1) for val in base_embedding] memory = Memory( content=f"Loose cluster content {i}", content_hash=f"loose_{i}", tags=["loose"], embedding=embedding, created_at=datetime.now().timestamp() ) loose_memories.append(memory) tight_clusters = await clustering_engine.process(tight_memories) loose_clusters = await clustering_engine.process(loose_memories) # Tight clusters should have higher coherence scores if tight_clusters and loose_clusters: tight_coherence = tight_clusters[0].coherence_score loose_coherence = loose_clusters[0].coherence_score # This may not always be true due to randomness, but generally should be # Just check that coherence scores are in valid range assert 0 <= tight_coherence <= 1 assert 0 <= loose_coherence <= 1 @pytest.mark.asyncio async def test_algorithm_fallback_handling(self, clustering_engine): """Test handling of different clustering algorithms.""" memories = [] base_embedding = [0.5, 0.4, 0.6, 0.3, 0.7] * 64 for i in range(8): # Enough for clustering embedding = [val + np.random.normal(0, 0.05) for val in base_embedding] memory = Memory( content=f"Test content {i}", content_hash=f"test_{i}", tags=["test"], embedding=embedding, created_at=datetime.now().timestamp() ) memories.append(memory) # Test different algorithms algorithms = ['simple', 'dbscan', 'hierarchical'] for algorithm in algorithms: original_algorithm = clustering_engine.algorithm clustering_engine.algorithm = algorithm try: clusters = await clustering_engine.process(memories) # All algorithms should return valid clusters assert isinstance(clusters, list) for cluster in clusters: assert isinstance(cluster, MemoryCluster) assert cluster.metadata['algorithm'] in [algorithm, f"{algorithm}_merged"] finally: clustering_engine.algorithm = original_algorithm @pytest.mark.asyncio async def test_empty_input_handling(self, clustering_engine): """Test handling of empty input.""" clusters = await clustering_engine.process([]) assert clusters == [] @pytest.mark.asyncio async def test_average_age_calculation(self, clustering_engine): """Test average age calculation in cluster metadata.""" now = datetime.now() memories = [] # Create memories with known ages ages = [1, 3, 5, 7, 9] # days ago for i, age in enumerate(ages): memory = Memory( content=f"Content {i}", content_hash=f"age_test_{i}", tags=["age_test"], embedding=[0.5 + i*0.01] * 320, # Slightly different embeddings created_at=(now - timedelta(days=age)).timestamp() ) memories.append(memory) clusters = await clustering_engine.process(memories) if clusters: cluster = clusters[0] avg_age = cluster.metadata['average_memory_age'] # Average age should be approximately the mean of our test ages expected_avg = sum(ages) / len(ages) assert abs(avg_age - expected_avg) < 1 # Within 1 day tolerance @pytest.mark.asyncio async def test_tag_distribution_analysis(self, clustering_engine): """Test tag distribution analysis in clusters.""" memories = [] base_embedding = [0.5] * 320 # Create memories with specific tag patterns tag_patterns = [ ["python", "programming"], ["python", "tutorial"], ["programming", "guide"], ["python", "programming"], # Duplicate pattern ["tutorial", "guide"] ] for i, tags in enumerate(tag_patterns): memory = Memory( content=f"Content {i}", content_hash=f"tag_test_{i}", tags=tags, embedding=[val + i*0.01 for val in base_embedding], created_at=datetime.now().timestamp() ) memories.append(memory) clusters = await clustering_engine.process(memories) if clusters: cluster = clusters[0] tag_dist = cluster.metadata['tag_distribution'] # Should count tag frequencies correctly assert isinstance(tag_dist, dict) assert tag_dist.get("python", 0) >= 2 # Appears multiple times assert tag_dist.get("programming", 0) >= 2 # Appears multiple times

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server