Skip to main content
Glama

MCP Memory Service

"""Integration tests for the main dream-inspired consolidator.""" import pytest from datetime import datetime, timedelta from unittest.mock import AsyncMock, MagicMock from mcp_memory_service.consolidation.consolidator import DreamInspiredConsolidator from mcp_memory_service.consolidation.base import ConsolidationReport from mcp_memory_service.models.memory import Memory @pytest.mark.integration class TestDreamInspiredConsolidator: """Test the main consolidation orchestrator.""" @pytest.fixture def consolidator(self, mock_storage, consolidation_config): return DreamInspiredConsolidator(mock_storage, consolidation_config) @pytest.mark.asyncio async def test_basic_consolidation_pipeline(self, consolidator, mock_storage): """Test the complete consolidation pipeline.""" report = await consolidator.consolidate("weekly") assert isinstance(report, ConsolidationReport) assert report.time_horizon == "weekly" assert isinstance(report.start_time, datetime) assert isinstance(report.end_time, datetime) assert report.end_time >= report.start_time assert report.memories_processed >= 0 assert report.associations_discovered >= 0 assert report.clusters_created >= 0 assert report.memories_compressed >= 0 assert report.memories_archived >= 0 assert isinstance(report.errors, list) assert isinstance(report.performance_metrics, dict) @pytest.mark.asyncio async def test_daily_consolidation(self, consolidator): """Test daily consolidation (light processing).""" report = await consolidator.consolidate("daily") assert report.time_horizon == "daily" # Daily consolidation should be lighter - less intensive operations assert isinstance(report, ConsolidationReport) @pytest.mark.asyncio async def test_weekly_consolidation(self, consolidator): """Test weekly consolidation (includes associations).""" report = await consolidator.consolidate("weekly") assert report.time_horizon == "weekly" # Weekly should include association discovery assert isinstance(report, ConsolidationReport) @pytest.mark.asyncio async def test_monthly_consolidation(self, consolidator): """Test monthly consolidation (includes forgetting).""" report = await consolidator.consolidate("monthly") assert report.time_horizon == "monthly" # Monthly should include more comprehensive processing assert isinstance(report, ConsolidationReport) @pytest.mark.asyncio async def test_quarterly_consolidation(self, consolidator): """Test quarterly consolidation (deep processing).""" report = await consolidator.consolidate("quarterly") assert report.time_horizon == "quarterly" # Quarterly should include all processing steps assert isinstance(report, ConsolidationReport) @pytest.mark.asyncio async def test_yearly_consolidation(self, consolidator): """Test yearly consolidation (full processing).""" report = await consolidator.consolidate("yearly") assert report.time_horizon == "yearly" # Yearly should include comprehensive forgetting assert isinstance(report, ConsolidationReport) @pytest.mark.asyncio async def test_invalid_time_horizon(self, consolidator): """Test handling of invalid time horizon.""" from mcp_memory_service.consolidation.base import ConsolidationError with pytest.raises(ConsolidationError): await consolidator.consolidate("invalid_horizon") @pytest.mark.asyncio async def test_empty_memory_set(self, consolidation_config): """Test consolidation with empty memory set.""" # Create storage with no memories empty_storage = AsyncMock() empty_storage.get_all_memories.return_value = [] empty_storage.get_memories_by_time_range.return_value = [] empty_storage.get_memory_connections.return_value = {} empty_storage.get_access_patterns.return_value = {} consolidator = DreamInspiredConsolidator(empty_storage, consolidation_config) report = await consolidator.consolidate("weekly") assert report.memories_processed == 0 assert report.associations_discovered == 0 assert report.clusters_created == 0 assert report.memories_compressed == 0 assert report.memories_archived == 0 @pytest.mark.asyncio async def test_memories_by_time_range_retrieval(self, consolidator, mock_storage): """Test retrieval of memories by time range for daily processing.""" # Mock the time range method to return specific memories recent_memories = mock_storage.memories.copy() mock_storage.get_memories_by_time_range = AsyncMock(return_value=list(recent_memories.values())[:3]) report = await consolidator.consolidate("daily") # Should have called the time range method for daily processing mock_storage.get_memories_by_time_range.assert_called_once() assert report.memories_processed >= 0 @pytest.mark.asyncio async def test_association_storage(self, consolidator, mock_storage): """Test that discovered associations are stored as memories.""" original_memory_count = len(mock_storage.memories) # Run consolidation that should discover associations report = await consolidator.consolidate("weekly") # Check if new association memories were added current_memory_count = len(mock_storage.memories) # May or may not find associations depending on similarity # Just ensure no errors occurred assert isinstance(report, ConsolidationReport) assert current_memory_count >= original_memory_count @pytest.mark.asyncio async def test_health_check(self, consolidator): """Test consolidation system health check.""" health = await consolidator.health_check() assert isinstance(health, dict) assert "status" in health assert "timestamp" in health assert "components" in health assert "statistics" in health # Check component health expected_components = [ "decay_calculator", "association_engine", "clustering_engine", "compression_engine", "forgetting_engine" ] for component in expected_components: assert component in health["components"] assert "status" in health["components"][component] @pytest.mark.asyncio async def test_consolidation_recommendations(self, consolidator): """Test consolidation recommendations.""" recommendations = await consolidator.get_consolidation_recommendations("weekly") assert isinstance(recommendations, dict) assert "recommendation" in recommendations assert "memory_count" in recommendations # Check recommendation types valid_recommendations = ["no_action", "consolidation_beneficial", "optional", "error"] assert recommendations["recommendation"] in valid_recommendations if recommendations["recommendation"] != "error": assert "reasons" in recommendations assert isinstance(recommendations["reasons"], list) @pytest.mark.asyncio async def test_performance_metrics(self, consolidator): """Test performance metrics collection.""" report = await consolidator.consolidate("daily") assert "performance_metrics" in report.__dict__ metrics = report.performance_metrics assert "duration_seconds" in metrics assert "memories_per_second" in metrics assert "success" in metrics assert isinstance(metrics["duration_seconds"], float) assert metrics["duration_seconds"] >= 0 assert isinstance(metrics["memories_per_second"], (int, float)) assert isinstance(metrics["success"], bool) @pytest.mark.asyncio async def test_consolidation_statistics_tracking(self, consolidator): """Test that consolidation statistics are tracked.""" initial_stats = consolidator.consolidation_stats.copy() # Run consolidation await consolidator.consolidate("weekly") # Check that stats were updated assert consolidator.consolidation_stats["total_runs"] == initial_stats["total_runs"] + 1 # Check other stats (may or may not be incremented depending on processing) for key in ["successful_runs", "total_memories_processed", "total_associations_created"]: assert consolidator.consolidation_stats[key] >= initial_stats[key] @pytest.mark.asyncio async def test_error_handling_in_pipeline(self, consolidation_config): """Test error handling in the consolidation pipeline.""" # Create storage that raises errors error_storage = AsyncMock() error_storage.get_all_memories.side_effect = Exception("Storage error") error_storage.get_memories_by_time_range.side_effect = Exception("Storage error") consolidator = DreamInspiredConsolidator(error_storage, consolidation_config) report = await consolidator.consolidate("weekly") # Should handle errors gracefully assert len(report.errors) > 0 assert report.performance_metrics["success"] is False @pytest.mark.asyncio async def test_component_integration(self, consolidator, mock_storage): """Test integration between different consolidation components.""" # Ensure we have enough memories for meaningful processing if len(mock_storage.memories) < 5: # Add more memories for testing base_time = datetime.now().timestamp() for i in range(10): memory = Memory( content=f"Integration test memory {i} with content", content_hash=f"integration_{i}", tags=["integration", "test"], embedding=[0.1 + i*0.01] * 320, created_at=base_time - (i * 3600) ) mock_storage.memories[memory.content_hash] = memory # Run full consolidation report = await consolidator.consolidate("monthly") # Verify that components worked together assert report.memories_processed > 0 # Check that the pipeline completed successfully assert report.performance_metrics["success"] is True @pytest.mark.asyncio async def test_time_horizon_specific_processing(self, consolidator): """Test that different time horizons trigger appropriate processing.""" # Test that weekly includes associations but not intensive forgetting weekly_report = await consolidator.consolidate("weekly") # Test that monthly includes forgetting monthly_report = await consolidator.consolidate("monthly") # Both should complete successfully assert weekly_report.performance_metrics["success"] is True assert monthly_report.performance_metrics["success"] is True # Monthly might have more archived memories (if forgetting triggered) # But this depends on the actual memory state, so just verify structure assert isinstance(weekly_report.memories_archived, int) assert isinstance(monthly_report.memories_archived, int) @pytest.mark.asyncio async def test_concurrent_consolidation_prevention(self, consolidator): """Test that the system handles concurrent consolidation requests appropriately.""" # Start two consolidations concurrently import asyncio task1 = asyncio.create_task(consolidator.consolidate("daily")) task2 = asyncio.create_task(consolidator.consolidate("weekly")) # Both should complete (the system should handle concurrency) report1, report2 = await asyncio.gather(task1, task2) assert isinstance(report1, ConsolidationReport) assert isinstance(report2, ConsolidationReport) assert report1.time_horizon == "daily" assert report2.time_horizon == "weekly" @pytest.mark.asyncio async def test_memory_metadata_updates(self, consolidator, mock_storage): """Test that memory metadata is updated during consolidation.""" original_memories = list(mock_storage.memories.values()) # Run consolidation await consolidator.consolidate("weekly") # Check that memories exist (update_memory would have been called internally) # Since the mock doesn't track calls, we just verify the process completed current_memories = list(mock_storage.memories.values()) assert len(current_memories) >= len(original_memories) @pytest.mark.asyncio async def test_large_memory_set_performance(self, consolidation_config, mock_large_storage): """Test performance with larger memory sets.""" consolidator = DreamInspiredConsolidator(mock_large_storage, consolidation_config) start_time = datetime.now() report = await consolidator.consolidate("weekly") end_time = datetime.now() duration = (end_time - start_time).total_seconds() # Should complete within reasonable time (adjust threshold as needed) assert duration < 30 # 30 seconds for 100 memories assert report.memories_processed > 0 assert report.performance_metrics["success"] is True # Performance should be reasonable if report.memories_processed > 0: memories_per_second = report.memories_processed / duration assert memories_per_second > 1 # At least 1 memory per second @pytest.mark.asyncio async def test_consolidation_report_completeness(self, consolidator): """Test that consolidation reports contain all expected information.""" report = await consolidator.consolidate("weekly") # Check all required fields required_fields = [ "time_horizon", "start_time", "end_time", "memories_processed", "associations_discovered", "clusters_created", "memories_compressed", "memories_archived", "errors", "performance_metrics" ] for field in required_fields: assert hasattr(report, field), f"Missing field: {field}" assert getattr(report, field) is not None, f"Field {field} is None" # Check performance metrics perf_metrics = report.performance_metrics assert "duration_seconds" in perf_metrics assert "memories_per_second" in perf_metrics assert "success" in perf_metrics @pytest.mark.asyncio async def test_storage_backend_integration(self, consolidator, mock_storage): """Test integration with storage backend methods.""" # Run consolidation report = await consolidator.consolidate("monthly") # Verify storage integration worked (memories were processed) assert report.memories_processed >= 0 assert isinstance(report.performance_metrics, dict) # Verify storage backend has the expected methods assert hasattr(mock_storage, 'get_all_memories') assert hasattr(mock_storage, 'get_memories_by_time_range') assert hasattr(mock_storage, 'get_memory_connections') assert hasattr(mock_storage, 'get_access_patterns') assert hasattr(mock_storage, 'update_memory') @pytest.mark.asyncio async def test_configuration_impact(self, mock_storage): """Test that configuration changes affect consolidation behavior.""" # Create two different configurations config1 = type('Config', (), { 'decay_enabled': True, 'associations_enabled': True, 'clustering_enabled': True, 'compression_enabled': True, 'forgetting_enabled': True, 'retention_periods': {'standard': 30}, 'min_similarity': 0.3, 'max_similarity': 0.7, 'max_pairs_per_run': 50, 'min_cluster_size': 3, 'clustering_algorithm': 'simple', 'max_summary_length': 200, 'preserve_originals': True, 'relevance_threshold': 0.1, 'access_threshold_days': 30, 'archive_location': None })() config2 = type('Config', (), { 'decay_enabled': False, 'associations_enabled': False, 'clustering_enabled': False, 'compression_enabled': False, 'forgetting_enabled': False, 'retention_periods': {'standard': 30}, 'min_similarity': 0.3, 'max_similarity': 0.7, 'max_pairs_per_run': 50, 'min_cluster_size': 3, 'clustering_algorithm': 'simple', 'max_summary_length': 200, 'preserve_originals': True, 'relevance_threshold': 0.1, 'access_threshold_days': 30, 'archive_location': None })() consolidator1 = DreamInspiredConsolidator(mock_storage, config1) consolidator2 = DreamInspiredConsolidator(mock_storage, config2) # Both should work, but may produce different results report1 = await consolidator1.consolidate("weekly") report2 = await consolidator2.consolidate("weekly") assert isinstance(report1, ConsolidationReport) assert isinstance(report2, ConsolidationReport) # With disabled features, the second consolidator might process differently # but both should complete successfully assert report1.performance_metrics["success"] is True assert report2.performance_metrics["success"] is True

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server