test_hybrid_storage.py•18.4 kB
#!/usr/bin/env python3
"""
Comprehensive tests for HybridMemoryStorage implementation.
Tests cover:
- Basic storage operations (store, retrieve, delete)
- Background synchronization service
- Failover and graceful degradation
- Configuration and health monitoring
- Performance characteristics
"""
import asyncio
import pytest
import tempfile
import os
import sys
import logging
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
from typing import Dict, Any
# Add src to path for imports
current_dir = Path(__file__).parent
src_dir = current_dir.parent / "src"
sys.path.insert(0, str(src_dir))
from mcp_memory_service.storage.hybrid import HybridMemoryStorage, BackgroundSyncService, SyncOperation
from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
from mcp_memory_service.models.memory import Memory, MemoryMetadata
# Configure test logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class MockCloudflareStorage:
    """Mock Cloudflare storage for testing."""
    def __init__(self, **kwargs):
        self.initialized = False
        self.stored_memories = {}
        self.fail_operations = False
        self.fail_initialization = False
    async def initialize(self):
        if self.fail_initialization:
            raise Exception("Mock Cloudflare initialization failed")
        self.initialized = True
    async def store(self, memory: Memory):
        if self.fail_operations:
            return False, "Mock Cloudflare operation failed"
        self.stored_memories[memory.content_hash] = memory
        return True, "Memory stored successfully"
    async def delete(self, content_hash: str):
        if self.fail_operations:
            return False, "Mock Cloudflare operation failed"
        if content_hash in self.stored_memories:
            del self.stored_memories[content_hash]
            return True, "Memory deleted successfully"
        return False, "Memory not found"
    async def update_memory_metadata(self, content_hash: str, updates: Dict[str, Any], preserve_timestamps: bool = True):
        if self.fail_operations:
            return False, "Mock Cloudflare operation failed"
        if content_hash in self.stored_memories:
            # Simple mock update
            return True, "Memory updated successfully"
        return False, "Memory not found"
    async def get_stats(self):
        if self.fail_operations:
            raise Exception("Mock Cloudflare stats failed")
        return {
            "total_memories": len(self.stored_memories),
            "storage_backend": "MockCloudflareStorage"
        }
    async def close(self):
        pass
@pytest.fixture
async def temp_sqlite_db():
    """Create a temporary SQLite database for testing."""
    with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp_file:
        db_path = tmp_file.name
    yield db_path
    # Cleanup
    if os.path.exists(db_path):
        os.unlink(db_path)
@pytest.fixture
async def mock_cloudflare_config():
    """Mock Cloudflare configuration for testing."""
    return {
        'api_token': 'test_token',
        'account_id': 'test_account',
        'vectorize_index': 'test_index',
        'd1_database_id': 'test_db_id'
    }
@pytest.fixture
async def hybrid_storage(temp_sqlite_db, mock_cloudflare_config):
    """Create a HybridMemoryStorage instance for testing."""
    with patch('mcp_memory_service.storage.hybrid.CloudflareStorage', MockCloudflareStorage):
        storage = HybridMemoryStorage(
            sqlite_db_path=temp_sqlite_db,
            embedding_model="all-MiniLM-L6-v2",
            cloudflare_config=mock_cloudflare_config,
            sync_interval=1,  # Short interval for testing
            batch_size=5
        )
        await storage.initialize()
        yield storage
        await storage.close()
@pytest.fixture
def sample_memory():
    """Create a sample memory for testing."""
    metadata = MemoryMetadata(
        memory_type="test",
        tags=["test", "sample"],
        created_at=1638360000.0
    )
    return Memory(
        content="This is a test memory for hybrid storage",
        metadata=metadata
    )
class TestHybridMemoryStorage:
    """Test cases for HybridMemoryStorage functionality."""
    @pytest.mark.asyncio
    async def test_initialization_with_cloudflare(self, temp_sqlite_db, mock_cloudflare_config):
        """Test successful initialization with Cloudflare configuration."""
        with patch('mcp_memory_service.storage.hybrid.CloudflareStorage', MockCloudflareStorage):
            storage = HybridMemoryStorage(
                sqlite_db_path=temp_sqlite_db,
                cloudflare_config=mock_cloudflare_config
            )
            await storage.initialize()
            assert storage.initialized
            assert storage.primary is not None
            assert storage.secondary is not None
            assert storage.sync_service is not None
            assert storage.sync_service.is_running
            await storage.close()
    @pytest.mark.asyncio
    async def test_initialization_without_cloudflare(self, temp_sqlite_db):
        """Test initialization without Cloudflare configuration (SQLite-only mode)."""
        storage = HybridMemoryStorage(sqlite_db_path=temp_sqlite_db)
        await storage.initialize()
        assert storage.initialized
        assert storage.primary is not None
        assert storage.secondary is None
        assert storage.sync_service is None
        await storage.close()
    @pytest.mark.asyncio
    async def test_initialization_with_cloudflare_failure(self, temp_sqlite_db, mock_cloudflare_config):
        """Test graceful handling of Cloudflare initialization failure."""
        def failing_cloudflare_storage(**kwargs):
            storage = MockCloudflareStorage(**kwargs)
            storage.fail_initialization = True
            return storage
        with patch('mcp_memory_service.storage.hybrid.CloudflareStorage', failing_cloudflare_storage):
            storage = HybridMemoryStorage(
                sqlite_db_path=temp_sqlite_db,
                cloudflare_config=mock_cloudflare_config
            )
            await storage.initialize()
            # Should fall back to SQLite-only mode
            assert storage.initialized
            assert storage.primary is not None
            assert storage.secondary is None
            assert storage.sync_service is None
            await storage.close()
    @pytest.mark.asyncio
    async def test_store_memory(self, hybrid_storage, sample_memory):
        """Test storing a memory in hybrid storage."""
        success, message = await hybrid_storage.store(sample_memory)
        assert success
        assert "success" in message.lower() or message == ""
        # Verify memory is stored in primary
        results = await hybrid_storage.retrieve(sample_memory.content, n_results=1)
        assert len(results) == 1
        assert results[0].memory.content == sample_memory.content
    @pytest.mark.asyncio
    async def test_retrieve_memory(self, hybrid_storage, sample_memory):
        """Test retrieving memories from hybrid storage."""
        # Store a memory first
        await hybrid_storage.store(sample_memory)
        # Retrieve by query
        results = await hybrid_storage.retrieve("test memory", n_results=5)
        assert len(results) >= 1
        # Check that we get the stored memory
        found = any(result.memory.content == sample_memory.content for result in results)
        assert found
    @pytest.mark.asyncio
    async def test_delete_memory(self, hybrid_storage, sample_memory):
        """Test deleting a memory from hybrid storage."""
        # Store a memory first
        await hybrid_storage.store(sample_memory)
        # Delete the memory
        success, message = await hybrid_storage.delete(sample_memory.content_hash)
        assert success
        # Verify memory is deleted from primary
        results = await hybrid_storage.retrieve(sample_memory.content, n_results=1)
        # Should not find the deleted memory
        found = any(result.memory.content_hash == sample_memory.content_hash for result in results)
        assert not found
    @pytest.mark.asyncio
    async def test_search_by_tags(self, hybrid_storage, sample_memory):
        """Test searching memories by tags."""
        # Store a memory first
        await hybrid_storage.store(sample_memory)
        # Search by tags
        results = await hybrid_storage.search_by_tags(["test"])
        assert len(results) >= 1
        # Check that we get the stored memory
        found = any(memory.content == sample_memory.content for memory in results)
        assert found
    @pytest.mark.asyncio
    async def test_get_stats(self, hybrid_storage):
        """Test getting statistics from hybrid storage."""
        stats = await hybrid_storage.get_stats()
        assert "storage_backend" in stats
        assert stats["storage_backend"] == "Hybrid (SQLite-vec + Cloudflare)"
        assert "primary_stats" in stats
        assert "sync_enabled" in stats
        assert stats["sync_enabled"] == True
        assert "sync_status" in stats
    @pytest.mark.asyncio
    async def test_force_sync(self, hybrid_storage, sample_memory):
        """Test forcing immediate synchronization."""
        # Store some memories
        await hybrid_storage.store(sample_memory)
        # Force sync
        result = await hybrid_storage.force_sync()
        assert "status" in result
        assert result["status"] in ["completed", "partial"]
        assert "primary_memories" in result
        assert result["primary_memories"] >= 1
class TestBackgroundSyncService:
    """Test cases for BackgroundSyncService functionality."""
    @pytest.fixture
    async def sync_service_components(self, temp_sqlite_db):
        """Create components needed for sync service testing."""
        primary = SqliteVecMemoryStorage(temp_sqlite_db)
        await primary.initialize()
        secondary = MockCloudflareStorage()
        await secondary.initialize()
        sync_service = BackgroundSyncService(
            primary, secondary,
            sync_interval=1,
            batch_size=3
        )
        yield primary, secondary, sync_service
        if sync_service.is_running:
            await sync_service.stop()
        if hasattr(primary, 'close'):
            await primary.close()
    @pytest.mark.asyncio
    async def test_sync_service_start_stop(self, sync_service_components):
        """Test starting and stopping the background sync service."""
        primary, secondary, sync_service = sync_service_components
        # Start service
        await sync_service.start()
        assert sync_service.is_running
        # Stop service
        await sync_service.stop()
        assert not sync_service.is_running
    @pytest.mark.asyncio
    async def test_operation_enqueue(self, sync_service_components, sample_memory):
        """Test enqueuing sync operations."""
        primary, secondary, sync_service = sync_service_components
        await sync_service.start()
        # Enqueue a store operation
        operation = SyncOperation(operation='store', memory=sample_memory)
        await sync_service.enqueue_operation(operation)
        # Wait a bit for processing
        await asyncio.sleep(0.1)
        # Check queue size decreased
        status = await sync_service.get_sync_status()
        assert status['queue_size'] >= 0  # Should be processed or in progress
        await sync_service.stop()
    @pytest.mark.asyncio
    async def test_sync_with_cloudflare_failure(self, sync_service_components):
        """Test sync behavior when Cloudflare operations fail."""
        primary, secondary, sync_service = sync_service_components
        # Make Cloudflare operations fail
        secondary.fail_operations = True
        await sync_service.start()
        # Create a test memory
        metadata = MemoryMetadata(memory_type="test", tags=["test"])
        memory = Memory(content="test content", metadata=metadata)
        # Enqueue operation
        operation = SyncOperation(operation='store', memory=memory)
        await sync_service.enqueue_operation(operation)
        # Wait for processing
        await asyncio.sleep(0.2)
        # Check that service marked Cloudflare as unavailable
        status = await sync_service.get_sync_status()
        assert status['cloudflare_available'] == False
        await sync_service.stop()
    @pytest.mark.asyncio
    async def test_force_sync_functionality(self, sync_service_components):
        """Test force sync functionality."""
        primary, secondary, sync_service = sync_service_components
        # Store some test memories in primary
        metadata = MemoryMetadata(memory_type="test", tags=["test"])
        memory1 = Memory(content="test memory 1", metadata=metadata)
        memory2 = Memory(content="test memory 2", metadata=metadata)
        await primary.store(memory1)
        await primary.store(memory2)
        await sync_service.start()
        # Force sync
        result = await sync_service.force_sync()
        assert result['status'] == 'completed'
        assert result['primary_memories'] == 2
        assert result['synced_to_secondary'] >= 0
        await sync_service.stop()
    @pytest.mark.asyncio
    async def test_sync_status_reporting(self, sync_service_components):
        """Test sync status reporting functionality."""
        primary, secondary, sync_service = sync_service_components
        await sync_service.start()
        status = await sync_service.get_sync_status()
        assert 'is_running' in status
        assert status['is_running'] == True
        assert 'queue_size' in status
        assert 'stats' in status
        assert 'cloudflare_available' in status
        await sync_service.stop()
class TestPerformanceCharacteristics:
    """Test performance characteristics of hybrid storage."""
    @pytest.mark.asyncio
    async def test_read_performance(self, hybrid_storage, sample_memory):
        """Test that reads are fast (should use SQLite-vec)."""
        # Store a memory
        await hybrid_storage.store(sample_memory)
        # Measure read performance
        import time
        start_time = time.time()
        results = await hybrid_storage.retrieve(sample_memory.content[:10], n_results=1)
        duration = time.time() - start_time
        # Should be very fast (< 100ms for local SQLite-vec)
        assert duration < 0.1
        assert len(results) >= 0  # Should get some results
    @pytest.mark.asyncio
    async def test_write_performance(self, hybrid_storage):
        """Test that writes are fast (immediate SQLite-vec write)."""
        metadata = MemoryMetadata(memory_type="performance_test", tags=["perf"])
        memory = Memory(content="Performance test memory", metadata=metadata)
        import time
        start_time = time.time()
        success, message = await hybrid_storage.store(memory)
        duration = time.time() - start_time
        # Should be very fast (< 100ms for local SQLite-vec)
        assert duration < 0.1
        assert success
    @pytest.mark.asyncio
    async def test_concurrent_operations(self, hybrid_storage):
        """Test concurrent memory operations."""
        # Create multiple memories
        memories = []
        for i in range(10):
            metadata = MemoryMetadata(memory_type="concurrent_test", tags=["concurrent", f"test{i}"])
            memory = Memory(content=f"Concurrent test memory {i}", metadata=metadata)
            memories.append(memory)
        # Store all memories concurrently
        tasks = [hybrid_storage.store(memory) for memory in memories]
        results = await asyncio.gather(*tasks)
        # All operations should succeed
        assert all(success for success, message in results)
        # Should be able to retrieve all memories
        search_results = await hybrid_storage.search_by_tags(["concurrent"])
        assert len(search_results) == 10
class TestErrorHandlingAndFallback:
    """Test error handling and fallback scenarios."""
    @pytest.mark.asyncio
    async def test_sqlite_only_mode(self, temp_sqlite_db):
        """Test operation in SQLite-only mode (no Cloudflare)."""
        storage = HybridMemoryStorage(sqlite_db_path=temp_sqlite_db)
        await storage.initialize()
        # Should work normally without Cloudflare
        metadata = MemoryMetadata(memory_type="sqlite_only", tags=["local"])
        memory = Memory(content="SQLite-only test memory", metadata=metadata)
        success, message = await storage.store(memory)
        assert success
        results = await storage.retrieve(memory.content, n_results=1)
        assert len(results) >= 1
        await storage.close()
    @pytest.mark.asyncio
    async def test_graceful_degradation(self, temp_sqlite_db, mock_cloudflare_config):
        """Test graceful degradation when Cloudflare becomes unavailable."""
        def unreliable_cloudflare_storage(**kwargs):
            storage = MockCloudflareStorage(**kwargs)
            # Will start working but then fail
            return storage
        with patch('mcp_memory_service.storage.hybrid.CloudflareStorage', unreliable_cloudflare_storage):
            storage = HybridMemoryStorage(
                sqlite_db_path=temp_sqlite_db,
                cloudflare_config=mock_cloudflare_config
            )
            await storage.initialize()
            # Initially should work
            metadata = MemoryMetadata(memory_type="degradation_test", tags=["test"])
            memory = Memory(content="Degradation test memory", metadata=metadata)
            success, message = await storage.store(memory)
            assert success
            # Make Cloudflare fail
            storage.secondary.fail_operations = True
            # Should still work (primary storage unaffected)
            memory2 = Memory(content="Second test memory", metadata=metadata)
            success, message = await storage.store(memory2)
            assert success
            # Retrieval should still work
            results = await storage.retrieve("test memory", n_results=10)
            assert len(results) >= 2
            await storage.close()
if __name__ == "__main__":
    # Run tests
    pytest.main([__file__, "-v"])