Skip to main content
Glama
test_cluster_detector_e2e.py13.8 kB
"""Integration tests for ClusterDetector (T038). End-to-end tests with real (test) storage to verify the full cluster detection workflow. Note: For clustering to work without embeddings, we rely on entity-based similarity. Memories with overlapping entities will cluster together. """ from __future__ import annotations import tempfile import time from pathlib import Path from unittest.mock import patch import pytest from cortexgraph.agents.cluster_detector import ClusterDetector from cortexgraph.agents.models import ClusterAction, ClusterResult from cortexgraph.storage.jsonl_storage import JSONLStorage from cortexgraph.storage.models import Memory # ============================================================================= # Test Fixtures # ============================================================================= @pytest.fixture def temp_storage_dir(): """Create temporary directory for test storage.""" with tempfile.TemporaryDirectory() as tmpdir: yield Path(tmpdir) @pytest.fixture def test_storage(temp_storage_dir: Path) -> JSONLStorage: """Create real JSONL storage with clusterable test data. Creates memories with overlapping entities that will cluster together based on entity similarity in cluster_memories_simple. """ storage = JSONLStorage(str(temp_storage_dir)) now = int(time.time()) # PostgreSQL cluster - 3 memories with shared "PostgreSQL" entity # Should form a cohesive cluster postgres_mem_1 = Memory( id="pg-1", content="PostgreSQL database configuration for production", entities=["PostgreSQL", "Database"], created_at=now - 86400, last_used=now - 3600, use_count=5, strength=1.0, ) postgres_mem_2 = Memory( id="pg-2", content="PostgreSQL connection pooling settings", entities=["PostgreSQL", "Connection"], created_at=now - 86400 * 2, last_used=now - 7200, use_count=3, strength=1.0, ) postgres_mem_3 = Memory( id="pg-3", content="PostgreSQL query optimization tips", entities=["PostgreSQL", "Query"], created_at=now - 86400 * 3, last_used=now - 10800, use_count=2, strength=1.0, ) # API authentication cluster - 2 memories with shared "JWT" entity jwt_mem_1 = Memory( id="jwt-1", content="JWT token generation process", entities=["JWT", "Authentication"], created_at=now - 86400, last_used=now - 1800, use_count=4, strength=1.2, ) jwt_mem_2 = Memory( id="jwt-2", content="JWT token validation middleware", entities=["JWT", "Middleware"], created_at=now - 86400 * 2, last_used=now - 3600, use_count=3, strength=1.1, ) # Isolated memory - won't cluster (no shared entities) isolated_mem = Memory( id="isolated", content="Random unrelated thought about cats", entities=["Cats"], created_at=now - 86400, last_used=now - 1800, use_count=1, strength=1.0, ) # Add memories to storage storage.memories = { "pg-1": postgres_mem_1, "pg-2": postgres_mem_2, "pg-3": postgres_mem_3, "jwt-1": jwt_mem_1, "jwt-2": jwt_mem_2, "isolated": isolated_mem, } return storage @pytest.fixture def cluster_detector_with_storage(test_storage: JSONLStorage) -> ClusterDetector: """Create ClusterDetector with real test storage.""" with patch("cortexgraph.agents.cluster_detector.get_storage", return_value=test_storage): detector = ClusterDetector(dry_run=True) detector._storage = test_storage return detector # ============================================================================= # T038: Integration Test - End-to-End Cluster Detection # ============================================================================= class TestClusterDetectionEndToEnd: """End-to-end integration tests for cluster detection workflow.""" def test_full_cluster_workflow( self, cluster_detector_with_storage: ClusterDetector, test_storage: JSONLStorage ) -> None: """Test complete cluster detection from scan to results.""" detector = cluster_detector_with_storage # Execute full run results = detector.run() # All results should be ClusterResult for result in results: assert isinstance(result, ClusterResult) def test_isolated_memory_not_clustered( self, cluster_detector_with_storage: ClusterDetector ) -> None: """Test isolated memory doesn't appear in clusters.""" detector = cluster_detector_with_storage # Scan for clusterable memories memory_ids = detector.scan() # Isolated memory should not be in results (no shared entities) assert "isolated" not in memory_ids def test_action_based_on_cohesion(self, cluster_detector_with_storage: ClusterDetector) -> None: """Test actions are recommended based on cohesion thresholds.""" detector = cluster_detector_with_storage results = detector.run() # All results should have valid actions matching cohesion thresholds for result in results: assert result.action in ClusterAction # Verify action matches cohesion if result.cohesion >= 0.75: assert result.action == ClusterAction.MERGE elif result.cohesion >= 0.40: assert result.action == ClusterAction.LINK else: assert result.action == ClusterAction.IGNORE def test_cohesion_in_valid_range(self, cluster_detector_with_storage: ClusterDetector) -> None: """Test cohesion scores are in valid range [0, 1].""" detector = cluster_detector_with_storage results = detector.run() for result in results: assert 0.0 <= result.cohesion <= 1.0, f"Cohesion {result.cohesion} out of range" def test_confidence_in_valid_range( self, cluster_detector_with_storage: ClusterDetector ) -> None: """Test confidence scores are in valid range [0, 1].""" detector = cluster_detector_with_storage results = detector.run() for result in results: assert 0.0 <= result.confidence <= 1.0, f"Confidence {result.confidence} out of range" def test_cluster_has_minimum_members( self, cluster_detector_with_storage: ClusterDetector ) -> None: """Test all clusters meet minimum size requirement.""" detector = cluster_detector_with_storage results = detector.run() for result in results: assert len(result.memory_ids) >= 2, "Cluster must have at least 2 members" def test_dry_run_no_mutations( self, cluster_detector_with_storage: ClusterDetector, test_storage: JSONLStorage ) -> None: """Test dry_run mode doesn't mutate storage.""" detector = cluster_detector_with_storage # Capture original state original_count = len(test_storage.memories) original_ids = set(test_storage.memories.keys()) # Run detector detector.run() # Verify no mutations assert len(test_storage.memories) == original_count assert set(test_storage.memories.keys()) == original_ids def test_stats_reflect_run(self, cluster_detector_with_storage: ClusterDetector) -> None: """Test statistics are updated correctly after run.""" detector = cluster_detector_with_storage # Initial stats should be zero initial_stats = detector.get_stats() assert initial_stats["processed"] == 0 # Run detector results = detector.run() # Stats should reflect processed count final_stats = detector.get_stats() assert final_stats["processed"] == len(results) assert final_stats["errors"] == 0 class TestClusterDetectorEdgeCases: """Integration tests for edge cases.""" def test_empty_storage(self, temp_storage_dir: Path) -> None: """Test behavior with empty storage.""" storage = JSONLStorage(str(temp_storage_dir)) storage.memories = {} with patch("cortexgraph.agents.cluster_detector.get_storage", return_value=storage): detector = ClusterDetector(dry_run=True) detector._storage = storage results = detector.run() assert results == [] assert detector.get_stats()["processed"] == 0 def test_single_memory(self, temp_storage_dir: Path) -> None: """Test behavior with only one memory (can't cluster).""" storage = JSONLStorage(str(temp_storage_dir)) now = int(time.time()) single = Memory( id="single", content="A lonely memory", entities=["Test"], created_at=now, last_used=now, use_count=1, strength=1.0, ) storage.memories = {"single": single} with patch("cortexgraph.agents.cluster_detector.get_storage", return_value=storage): detector = ClusterDetector(dry_run=True) detector._storage = storage results = detector.run() # Single memory can't form a cluster assert results == [] def test_no_overlapping_entities(self, temp_storage_dir: Path) -> None: """Test behavior when memories have no overlapping entities.""" storage = JSONLStorage(str(temp_storage_dir)) now = int(time.time()) mem_1 = Memory( id="mem-1", content="About cats", entities=["Cats"], created_at=now, last_used=now, use_count=1, strength=1.0, ) mem_2 = Memory( id="mem-2", content="About dogs", entities=["Dogs"], created_at=now, last_used=now, use_count=1, strength=1.0, ) mem_3 = Memory( id="mem-3", content="About birds", entities=["Birds"], created_at=now, last_used=now, use_count=1, strength=1.0, ) storage.memories = {"mem-1": mem_1, "mem-2": mem_2, "mem-3": mem_3} with patch("cortexgraph.agents.cluster_detector.get_storage", return_value=storage): detector = ClusterDetector(dry_run=True) detector._storage = storage results = detector.run() # No shared entities = no clusters assert results == [] def test_custom_similarity_threshold(self, temp_storage_dir: Path) -> None: """Test custom similarity threshold affects clustering.""" storage = JSONLStorage(str(temp_storage_dir)) now = int(time.time()) # Create memories with partial entity overlap mem_1 = Memory( id="mem-1", content="Database config", entities=["Database", "Config"], created_at=now, last_used=now, use_count=1, strength=1.0, ) mem_2 = Memory( id="mem-2", content="Database backup", entities=["Database", "Backup"], created_at=now, last_used=now, use_count=1, strength=1.0, ) storage.memories = {"mem-1": mem_1, "mem-2": mem_2} with patch("cortexgraph.agents.cluster_detector.get_storage", return_value=storage): # Lower threshold should be more permissive detector_low = ClusterDetector(dry_run=True, similarity_threshold=0.20) detector_low._storage = storage # Higher threshold should be stricter detector_high = ClusterDetector(dry_run=True, similarity_threshold=0.80) detector_high._storage = storage results_low = detector_low.run() results_high = detector_high.run() # Lower threshold should capture more (or same) clusters assert len(results_low) >= len(results_high) def test_custom_min_cluster_size(self, temp_storage_dir: Path) -> None: """Test custom minimum cluster size.""" storage = JSONLStorage(str(temp_storage_dir)) now = int(time.time()) # Create 2 memories with shared entity mem_1 = Memory( id="mem-1", content="Python basics", entities=["Python"], created_at=now, last_used=now, use_count=1, strength=1.0, ) mem_2 = Memory( id="mem-2", content="Python advanced", entities=["Python"], created_at=now, last_used=now, use_count=1, strength=1.0, ) storage.memories = {"mem-1": mem_1, "mem-2": mem_2} with patch("cortexgraph.agents.cluster_detector.get_storage", return_value=storage): # Default min_cluster_size=2 should find cluster detector_default = ClusterDetector(dry_run=True, min_cluster_size=2) detector_default._storage = storage # Higher min_cluster_size=3 should not find cluster detector_strict = ClusterDetector(dry_run=True, min_cluster_size=3) detector_strict._storage = storage results_default = detector_default.run() results_strict = detector_strict.run() # Strict threshold should find fewer clusters assert len(results_strict) <= len(results_default)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/prefrontalsys/mnemex'

If you have feedback or need assistance with the MCP directory API, please join our Discord server