We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Unit tests for the semantic clustering engine."""
import pytest
import numpy as np
from datetime import datetime, timedelta
from mcp_memory_service.consolidation.clustering import SemanticClusteringEngine
from mcp_memory_service.consolidation.base import MemoryCluster
from mcp_memory_service.models.memory import Memory
@pytest.mark.unit
class TestSemanticClusteringEngine:
"""Test the semantic clustering system."""
@pytest.fixture
def clustering_engine(self, consolidation_config):
return SemanticClusteringEngine(consolidation_config)
@pytest.mark.asyncio
async def test_basic_clustering(self, clustering_engine, sample_memories):
"""Test basic clustering functionality."""
# Use memories with embeddings
memories_with_embeddings = [m for m in sample_memories if m.embedding]
clusters = await clustering_engine.process(memories_with_embeddings)
assert isinstance(clusters, list)
assert all(isinstance(cluster, MemoryCluster) for cluster in clusters)
for cluster in clusters:
assert len(cluster.memory_hashes) >= clustering_engine.min_cluster_size
assert isinstance(cluster.cluster_id, str)
assert isinstance(cluster.centroid_embedding, list)
assert len(cluster.centroid_embedding) > 0
assert 0 <= cluster.coherence_score <= 1
assert isinstance(cluster.created_at, datetime)
assert isinstance(cluster.theme_keywords, list)
@pytest.mark.asyncio
async def test_clustering_with_similar_embeddings(self, clustering_engine):
"""Test clustering with known similar embeddings."""
base_time = datetime.now().timestamp()
# Create memories with similar embeddings (should cluster together)
similar_memories = []
base_embedding = [0.5, 0.4, 0.6, 0.3, 0.7]
for i in range(6): # Create enough for min_cluster_size
# Add small variations to base embedding
embedding = []
for val in base_embedding * 64: # 320-dim
noise = np.random.normal(0, 0.05) # Small noise
embedding.append(max(0, min(1, val + noise)))
memory = Memory(
content=f"Similar content about programming topic {i}",
content_hash=f"similar_{i}",
tags=["programming", "similar"],
embedding=embedding,
created_at=base_time - (i * 3600)
)
similar_memories.append(memory)
# Add some different memories
for i in range(3):
different_embedding = [0.1, 0.9, 0.2, 0.8, 0.1] * 64
memory = Memory(
content=f"Different content about weather {i}",
content_hash=f"different_{i}",
tags=["weather", "different"],
embedding=different_embedding,
created_at=base_time - (i * 3600)
)
similar_memories.append(memory)
clusters = await clustering_engine.process(similar_memories)
# Should find at least one cluster
assert len(clusters) >= 1
# Check that similar memories are clustered together
if clusters:
# Find cluster with similar memories
similar_cluster = None
for cluster in clusters:
if any("similar" in hash_id for hash_id in cluster.memory_hashes):
similar_cluster = cluster
break
if similar_cluster:
# Should contain multiple similar memories
similar_hashes = [h for h in similar_cluster.memory_hashes if "similar" in h]
assert len(similar_hashes) >= clustering_engine.min_cluster_size
@pytest.mark.asyncio
async def test_insufficient_memories(self, clustering_engine):
"""Test handling of insufficient memories for clustering."""
# Create too few memories
few_memories = []
for i in range(2): # Less than min_cluster_size
memory = Memory(
content=f"Content {i}",
content_hash=f"hash_{i}",
tags=["test"],
embedding=[0.5] * 320,
created_at=datetime.now().timestamp()
)
few_memories.append(memory)
clusters = await clustering_engine.process(few_memories)
assert clusters == []
@pytest.mark.asyncio
async def test_memories_without_embeddings(self, clustering_engine):
"""Test handling of memories without embeddings."""
memories_no_embeddings = []
for i in range(5):
memory = Memory(
content=f"Content {i}",
content_hash=f"hash_{i}",
tags=["test"],
embedding=None, # No embedding
created_at=datetime.now().timestamp()
)
memories_no_embeddings.append(memory)
clusters = await clustering_engine.process(memories_no_embeddings)
assert clusters == []
@pytest.mark.asyncio
async def test_theme_keyword_extraction(self, clustering_engine):
"""Test extraction of theme keywords from clusters."""
# Create memories with common themes
themed_memories = []
base_embedding = [0.5, 0.5, 0.5, 0.5, 0.5] * 64
for i in range(5):
memory = Memory(
content=f"Python programming tutorial {i} about functions and classes",
content_hash=f"python_{i}",
tags=["python", "programming", "tutorial"],
embedding=[val + np.random.normal(0, 0.02) for val in base_embedding],
created_at=datetime.now().timestamp()
)
themed_memories.append(memory)
clusters = await clustering_engine.process(themed_memories)
if clusters:
cluster = clusters[0]
# Should extract relevant theme keywords
assert len(cluster.theme_keywords) > 0
# Should include frequent tags
common_tags = {"python", "programming", "tutorial"}
found_tags = set(cluster.theme_keywords).intersection(common_tags)
assert len(found_tags) > 0
@pytest.mark.asyncio
async def test_cluster_metadata(self, clustering_engine, sample_memories):
"""Test that cluster metadata is properly populated."""
memories_with_embeddings = [m for m in sample_memories if m.embedding]
if len(memories_with_embeddings) >= clustering_engine.min_cluster_size:
clusters = await clustering_engine.process(memories_with_embeddings)
for cluster in clusters:
assert 'algorithm' in cluster.metadata
assert 'cluster_size' in cluster.metadata
assert 'average_memory_age' in cluster.metadata
assert 'tag_distribution' in cluster.metadata
assert cluster.metadata['cluster_size'] == len(cluster.memory_hashes)
assert isinstance(cluster.metadata['average_memory_age'], float)
assert isinstance(cluster.metadata['tag_distribution'], dict)
@pytest.mark.asyncio
async def test_simple_clustering_fallback(self, clustering_engine):
"""Test simple clustering algorithm fallback."""
# Force simple clustering algorithm
original_algorithm = clustering_engine.algorithm
clustering_engine.algorithm = 'simple'
try:
# Create memories with known similarity patterns
similar_memories = []
# Group 1: High similarity
base1 = [0.8, 0.7, 0.9, 0.8, 0.7] * 64
for i in range(4):
embedding = [val + np.random.normal(0, 0.01) for val in base1]
memory = Memory(
content=f"Group 1 content {i}",
content_hash=f"group1_{i}",
tags=["group1"],
embedding=embedding,
created_at=datetime.now().timestamp()
)
similar_memories.append(memory)
# Group 2: Different but internally similar
base2 = [0.2, 0.3, 0.1, 0.2, 0.3] * 64
for i in range(4):
embedding = [val + np.random.normal(0, 0.01) for val in base2]
memory = Memory(
content=f"Group 2 content {i}",
content_hash=f"group2_{i}",
tags=["group2"],
embedding=embedding,
created_at=datetime.now().timestamp()
)
similar_memories.append(memory)
clusters = await clustering_engine.process(similar_memories)
# Simple algorithm should still find clusters
assert isinstance(clusters, list)
finally:
clustering_engine.algorithm = original_algorithm
@pytest.mark.asyncio
async def test_merge_similar_clusters(self, clustering_engine):
"""Test merging of similar clusters."""
# Create two similar clusters
cluster1 = MemoryCluster(
cluster_id="cluster1",
memory_hashes=["hash1", "hash2"],
centroid_embedding=[0.5, 0.5, 0.5] * 107, # ~320 dim
coherence_score=0.8,
created_at=datetime.now(),
theme_keywords=["python", "programming"]
)
cluster2 = MemoryCluster(
cluster_id="cluster2",
memory_hashes=["hash3", "hash4"],
centroid_embedding=[0.52, 0.48, 0.51] * 107, # Similar to cluster1
coherence_score=0.7,
created_at=datetime.now(),
theme_keywords=["python", "coding"]
)
# Very different cluster
cluster3 = MemoryCluster(
cluster_id="cluster3",
memory_hashes=["hash5", "hash6"],
centroid_embedding=[0.1, 0.9, 0.1] * 107, # Very different
coherence_score=0.6,
created_at=datetime.now(),
theme_keywords=["weather", "forecast"]
)
clusters = [cluster1, cluster2, cluster3]
merged_clusters = await clustering_engine.merge_similar_clusters(
clusters, similarity_threshold=0.9
)
# Should merge similar clusters
assert len(merged_clusters) <= len(clusters)
# Check that merged cluster contains memories from both original clusters
if len(merged_clusters) < len(clusters):
# Find the merged cluster (should have more memories)
merged = max(merged_clusters, key=lambda c: len(c.memory_hashes))
assert len(merged.memory_hashes) >= 4 # Combined from cluster1 and cluster2
@pytest.mark.asyncio
async def test_coherence_score_calculation(self, clustering_engine):
"""Test coherence score calculation for clusters."""
# Create tightly clustered memories
tight_memories = []
base_embedding = [0.5, 0.5, 0.5, 0.5, 0.5] * 64
for i in range(5):
# Very similar embeddings (high coherence)
embedding = [val + np.random.normal(0, 0.01) for val in base_embedding]
memory = Memory(
content=f"Tight cluster content {i}",
content_hash=f"tight_{i}",
tags=["tight"],
embedding=embedding,
created_at=datetime.now().timestamp()
)
tight_memories.append(memory)
# Create loosely clustered memories
loose_memories = []
for i in range(5):
# More varied embeddings (lower coherence)
embedding = [val + np.random.normal(0, 0.1) for val in base_embedding]
memory = Memory(
content=f"Loose cluster content {i}",
content_hash=f"loose_{i}",
tags=["loose"],
embedding=embedding,
created_at=datetime.now().timestamp()
)
loose_memories.append(memory)
tight_clusters = await clustering_engine.process(tight_memories)
loose_clusters = await clustering_engine.process(loose_memories)
# Tight clusters should have higher coherence scores
if tight_clusters and loose_clusters:
tight_coherence = tight_clusters[0].coherence_score
loose_coherence = loose_clusters[0].coherence_score
# This may not always be true due to randomness, but generally should be
# Just check that coherence scores are in valid range
assert 0 <= tight_coherence <= 1
assert 0 <= loose_coherence <= 1
@pytest.mark.asyncio
async def test_algorithm_fallback_handling(self, clustering_engine):
"""Test handling of different clustering algorithms."""
memories = []
base_embedding = [0.5, 0.4, 0.6, 0.3, 0.7] * 64
for i in range(8): # Enough for clustering
embedding = [val + np.random.normal(0, 0.05) for val in base_embedding]
memory = Memory(
content=f"Test content {i}",
content_hash=f"test_{i}",
tags=["test"],
embedding=embedding,
created_at=datetime.now().timestamp()
)
memories.append(memory)
# Test different algorithms
algorithms = ['simple', 'dbscan', 'hierarchical']
for algorithm in algorithms:
original_algorithm = clustering_engine.algorithm
clustering_engine.algorithm = algorithm
try:
clusters = await clustering_engine.process(memories)
# All algorithms should return valid clusters
assert isinstance(clusters, list)
for cluster in clusters:
assert isinstance(cluster, MemoryCluster)
assert cluster.metadata['algorithm'] in [algorithm, f"{algorithm}_merged"]
finally:
clustering_engine.algorithm = original_algorithm
@pytest.mark.asyncio
async def test_empty_input_handling(self, clustering_engine):
"""Test handling of empty input."""
clusters = await clustering_engine.process([])
assert clusters == []
@pytest.mark.asyncio
async def test_average_age_calculation(self, clustering_engine):
"""Test average age calculation in cluster metadata."""
now = datetime.now()
memories = []
# Create memories with known ages
ages = [1, 3, 5, 7, 9] # days ago
for i, age in enumerate(ages):
memory = Memory(
content=f"Content {i}",
content_hash=f"age_test_{i}",
tags=["age_test"],
embedding=[0.5 + i*0.01] * 320, # Slightly different embeddings
created_at=(now - timedelta(days=age)).timestamp()
)
memories.append(memory)
clusters = await clustering_engine.process(memories)
if clusters:
cluster = clusters[0]
avg_age = cluster.metadata['average_memory_age']
# Average age should be approximately the mean of our test ages
expected_avg = sum(ages) / len(ages)
assert abs(avg_age - expected_avg) < 1 # Within 1 day tolerance
@pytest.mark.asyncio
async def test_tag_distribution_analysis(self, clustering_engine):
"""Test tag distribution analysis in clusters."""
memories = []
base_embedding = [0.5] * 320
# Create memories with specific tag patterns
tag_patterns = [
["python", "programming"],
["python", "tutorial"],
["programming", "guide"],
["python", "programming"], # Duplicate pattern
["tutorial", "guide"]
]
for i, tags in enumerate(tag_patterns):
memory = Memory(
content=f"Content {i}",
content_hash=f"tag_test_{i}",
tags=tags,
embedding=[val + i*0.01 for val in base_embedding],
created_at=datetime.now().timestamp()
)
memories.append(memory)
clusters = await clustering_engine.process(memories)
if clusters:
cluster = clusters[0]
tag_dist = cluster.metadata['tag_distribution']
# Should count tag frequencies correctly
assert isinstance(tag_dist, dict)
assert tag_dist.get("python", 0) >= 2 # Appears multiple times
assert tag_dist.get("programming", 0) >= 2 # Appears multiple times