test_hybrid_storage.py•18.4 kB
#!/usr/bin/env python3
"""
Comprehensive tests for HybridMemoryStorage implementation.
Tests cover:
- Basic storage operations (store, retrieve, delete)
- Background synchronization service
- Failover and graceful degradation
- Configuration and health monitoring
- Performance characteristics
"""
import asyncio
import pytest
import tempfile
import os
import sys
import logging
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
from typing import Dict, Any
# Add src to path for imports
current_dir = Path(__file__).parent
src_dir = current_dir.parent / "src"
sys.path.insert(0, str(src_dir))
from mcp_memory_service.storage.hybrid import HybridMemoryStorage, BackgroundSyncService, SyncOperation
from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
from mcp_memory_service.models.memory import Memory, MemoryMetadata
# Configure test logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class MockCloudflareStorage:
"""Mock Cloudflare storage for testing."""
def __init__(self, **kwargs):
self.initialized = False
self.stored_memories = {}
self.fail_operations = False
self.fail_initialization = False
async def initialize(self):
if self.fail_initialization:
raise Exception("Mock Cloudflare initialization failed")
self.initialized = True
async def store(self, memory: Memory):
if self.fail_operations:
return False, "Mock Cloudflare operation failed"
self.stored_memories[memory.content_hash] = memory
return True, "Memory stored successfully"
async def delete(self, content_hash: str):
if self.fail_operations:
return False, "Mock Cloudflare operation failed"
if content_hash in self.stored_memories:
del self.stored_memories[content_hash]
return True, "Memory deleted successfully"
return False, "Memory not found"
async def update_memory_metadata(self, content_hash: str, updates: Dict[str, Any], preserve_timestamps: bool = True):
if self.fail_operations:
return False, "Mock Cloudflare operation failed"
if content_hash in self.stored_memories:
# Simple mock update
return True, "Memory updated successfully"
return False, "Memory not found"
async def get_stats(self):
if self.fail_operations:
raise Exception("Mock Cloudflare stats failed")
return {
"total_memories": len(self.stored_memories),
"storage_backend": "MockCloudflareStorage"
}
async def close(self):
pass
@pytest.fixture
async def temp_sqlite_db():
"""Create a temporary SQLite database for testing."""
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp_file:
db_path = tmp_file.name
yield db_path
# Cleanup
if os.path.exists(db_path):
os.unlink(db_path)
@pytest.fixture
async def mock_cloudflare_config():
"""Mock Cloudflare configuration for testing."""
return {
'api_token': 'test_token',
'account_id': 'test_account',
'vectorize_index': 'test_index',
'd1_database_id': 'test_db_id'
}
@pytest.fixture
async def hybrid_storage(temp_sqlite_db, mock_cloudflare_config):
"""Create a HybridMemoryStorage instance for testing."""
with patch('mcp_memory_service.storage.hybrid.CloudflareStorage', MockCloudflareStorage):
storage = HybridMemoryStorage(
sqlite_db_path=temp_sqlite_db,
embedding_model="all-MiniLM-L6-v2",
cloudflare_config=mock_cloudflare_config,
sync_interval=1, # Short interval for testing
batch_size=5
)
await storage.initialize()
yield storage
await storage.close()
@pytest.fixture
def sample_memory():
"""Create a sample memory for testing."""
metadata = MemoryMetadata(
memory_type="test",
tags=["test", "sample"],
created_at=1638360000.0
)
return Memory(
content="This is a test memory for hybrid storage",
metadata=metadata
)
class TestHybridMemoryStorage:
"""Test cases for HybridMemoryStorage functionality."""
@pytest.mark.asyncio
async def test_initialization_with_cloudflare(self, temp_sqlite_db, mock_cloudflare_config):
"""Test successful initialization with Cloudflare configuration."""
with patch('mcp_memory_service.storage.hybrid.CloudflareStorage', MockCloudflareStorage):
storage = HybridMemoryStorage(
sqlite_db_path=temp_sqlite_db,
cloudflare_config=mock_cloudflare_config
)
await storage.initialize()
assert storage.initialized
assert storage.primary is not None
assert storage.secondary is not None
assert storage.sync_service is not None
assert storage.sync_service.is_running
await storage.close()
@pytest.mark.asyncio
async def test_initialization_without_cloudflare(self, temp_sqlite_db):
"""Test initialization without Cloudflare configuration (SQLite-only mode)."""
storage = HybridMemoryStorage(sqlite_db_path=temp_sqlite_db)
await storage.initialize()
assert storage.initialized
assert storage.primary is not None
assert storage.secondary is None
assert storage.sync_service is None
await storage.close()
@pytest.mark.asyncio
async def test_initialization_with_cloudflare_failure(self, temp_sqlite_db, mock_cloudflare_config):
"""Test graceful handling of Cloudflare initialization failure."""
def failing_cloudflare_storage(**kwargs):
storage = MockCloudflareStorage(**kwargs)
storage.fail_initialization = True
return storage
with patch('mcp_memory_service.storage.hybrid.CloudflareStorage', failing_cloudflare_storage):
storage = HybridMemoryStorage(
sqlite_db_path=temp_sqlite_db,
cloudflare_config=mock_cloudflare_config
)
await storage.initialize()
# Should fall back to SQLite-only mode
assert storage.initialized
assert storage.primary is not None
assert storage.secondary is None
assert storage.sync_service is None
await storage.close()
@pytest.mark.asyncio
async def test_store_memory(self, hybrid_storage, sample_memory):
"""Test storing a memory in hybrid storage."""
success, message = await hybrid_storage.store(sample_memory)
assert success
assert "success" in message.lower() or message == ""
# Verify memory is stored in primary
results = await hybrid_storage.retrieve(sample_memory.content, n_results=1)
assert len(results) == 1
assert results[0].memory.content == sample_memory.content
@pytest.mark.asyncio
async def test_retrieve_memory(self, hybrid_storage, sample_memory):
"""Test retrieving memories from hybrid storage."""
# Store a memory first
await hybrid_storage.store(sample_memory)
# Retrieve by query
results = await hybrid_storage.retrieve("test memory", n_results=5)
assert len(results) >= 1
# Check that we get the stored memory
found = any(result.memory.content == sample_memory.content for result in results)
assert found
@pytest.mark.asyncio
async def test_delete_memory(self, hybrid_storage, sample_memory):
"""Test deleting a memory from hybrid storage."""
# Store a memory first
await hybrid_storage.store(sample_memory)
# Delete the memory
success, message = await hybrid_storage.delete(sample_memory.content_hash)
assert success
# Verify memory is deleted from primary
results = await hybrid_storage.retrieve(sample_memory.content, n_results=1)
# Should not find the deleted memory
found = any(result.memory.content_hash == sample_memory.content_hash for result in results)
assert not found
@pytest.mark.asyncio
async def test_search_by_tags(self, hybrid_storage, sample_memory):
"""Test searching memories by tags."""
# Store a memory first
await hybrid_storage.store(sample_memory)
# Search by tags
results = await hybrid_storage.search_by_tags(["test"])
assert len(results) >= 1
# Check that we get the stored memory
found = any(memory.content == sample_memory.content for memory in results)
assert found
@pytest.mark.asyncio
async def test_get_stats(self, hybrid_storage):
"""Test getting statistics from hybrid storage."""
stats = await hybrid_storage.get_stats()
assert "storage_backend" in stats
assert stats["storage_backend"] == "Hybrid (SQLite-vec + Cloudflare)"
assert "primary_stats" in stats
assert "sync_enabled" in stats
assert stats["sync_enabled"] == True
assert "sync_status" in stats
@pytest.mark.asyncio
async def test_force_sync(self, hybrid_storage, sample_memory):
"""Test forcing immediate synchronization."""
# Store some memories
await hybrid_storage.store(sample_memory)
# Force sync
result = await hybrid_storage.force_sync()
assert "status" in result
assert result["status"] in ["completed", "partial"]
assert "primary_memories" in result
assert result["primary_memories"] >= 1
class TestBackgroundSyncService:
"""Test cases for BackgroundSyncService functionality."""
@pytest.fixture
async def sync_service_components(self, temp_sqlite_db):
"""Create components needed for sync service testing."""
primary = SqliteVecMemoryStorage(temp_sqlite_db)
await primary.initialize()
secondary = MockCloudflareStorage()
await secondary.initialize()
sync_service = BackgroundSyncService(
primary, secondary,
sync_interval=1,
batch_size=3
)
yield primary, secondary, sync_service
if sync_service.is_running:
await sync_service.stop()
if hasattr(primary, 'close'):
await primary.close()
@pytest.mark.asyncio
async def test_sync_service_start_stop(self, sync_service_components):
"""Test starting and stopping the background sync service."""
primary, secondary, sync_service = sync_service_components
# Start service
await sync_service.start()
assert sync_service.is_running
# Stop service
await sync_service.stop()
assert not sync_service.is_running
@pytest.mark.asyncio
async def test_operation_enqueue(self, sync_service_components, sample_memory):
"""Test enqueuing sync operations."""
primary, secondary, sync_service = sync_service_components
await sync_service.start()
# Enqueue a store operation
operation = SyncOperation(operation='store', memory=sample_memory)
await sync_service.enqueue_operation(operation)
# Wait a bit for processing
await asyncio.sleep(0.1)
# Check queue size decreased
status = await sync_service.get_sync_status()
assert status['queue_size'] >= 0 # Should be processed or in progress
await sync_service.stop()
@pytest.mark.asyncio
async def test_sync_with_cloudflare_failure(self, sync_service_components):
"""Test sync behavior when Cloudflare operations fail."""
primary, secondary, sync_service = sync_service_components
# Make Cloudflare operations fail
secondary.fail_operations = True
await sync_service.start()
# Create a test memory
metadata = MemoryMetadata(memory_type="test", tags=["test"])
memory = Memory(content="test content", metadata=metadata)
# Enqueue operation
operation = SyncOperation(operation='store', memory=memory)
await sync_service.enqueue_operation(operation)
# Wait for processing
await asyncio.sleep(0.2)
# Check that service marked Cloudflare as unavailable
status = await sync_service.get_sync_status()
assert status['cloudflare_available'] == False
await sync_service.stop()
@pytest.mark.asyncio
async def test_force_sync_functionality(self, sync_service_components):
"""Test force sync functionality."""
primary, secondary, sync_service = sync_service_components
# Store some test memories in primary
metadata = MemoryMetadata(memory_type="test", tags=["test"])
memory1 = Memory(content="test memory 1", metadata=metadata)
memory2 = Memory(content="test memory 2", metadata=metadata)
await primary.store(memory1)
await primary.store(memory2)
await sync_service.start()
# Force sync
result = await sync_service.force_sync()
assert result['status'] == 'completed'
assert result['primary_memories'] == 2
assert result['synced_to_secondary'] >= 0
await sync_service.stop()
@pytest.mark.asyncio
async def test_sync_status_reporting(self, sync_service_components):
"""Test sync status reporting functionality."""
primary, secondary, sync_service = sync_service_components
await sync_service.start()
status = await sync_service.get_sync_status()
assert 'is_running' in status
assert status['is_running'] == True
assert 'queue_size' in status
assert 'stats' in status
assert 'cloudflare_available' in status
await sync_service.stop()
class TestPerformanceCharacteristics:
"""Test performance characteristics of hybrid storage."""
@pytest.mark.asyncio
async def test_read_performance(self, hybrid_storage, sample_memory):
"""Test that reads are fast (should use SQLite-vec)."""
# Store a memory
await hybrid_storage.store(sample_memory)
# Measure read performance
import time
start_time = time.time()
results = await hybrid_storage.retrieve(sample_memory.content[:10], n_results=1)
duration = time.time() - start_time
# Should be very fast (< 100ms for local SQLite-vec)
assert duration < 0.1
assert len(results) >= 0 # Should get some results
@pytest.mark.asyncio
async def test_write_performance(self, hybrid_storage):
"""Test that writes are fast (immediate SQLite-vec write)."""
metadata = MemoryMetadata(memory_type="performance_test", tags=["perf"])
memory = Memory(content="Performance test memory", metadata=metadata)
import time
start_time = time.time()
success, message = await hybrid_storage.store(memory)
duration = time.time() - start_time
# Should be very fast (< 100ms for local SQLite-vec)
assert duration < 0.1
assert success
@pytest.mark.asyncio
async def test_concurrent_operations(self, hybrid_storage):
"""Test concurrent memory operations."""
# Create multiple memories
memories = []
for i in range(10):
metadata = MemoryMetadata(memory_type="concurrent_test", tags=["concurrent", f"test{i}"])
memory = Memory(content=f"Concurrent test memory {i}", metadata=metadata)
memories.append(memory)
# Store all memories concurrently
tasks = [hybrid_storage.store(memory) for memory in memories]
results = await asyncio.gather(*tasks)
# All operations should succeed
assert all(success for success, message in results)
# Should be able to retrieve all memories
search_results = await hybrid_storage.search_by_tags(["concurrent"])
assert len(search_results) == 10
class TestErrorHandlingAndFallback:
"""Test error handling and fallback scenarios."""
@pytest.mark.asyncio
async def test_sqlite_only_mode(self, temp_sqlite_db):
"""Test operation in SQLite-only mode (no Cloudflare)."""
storage = HybridMemoryStorage(sqlite_db_path=temp_sqlite_db)
await storage.initialize()
# Should work normally without Cloudflare
metadata = MemoryMetadata(memory_type="sqlite_only", tags=["local"])
memory = Memory(content="SQLite-only test memory", metadata=metadata)
success, message = await storage.store(memory)
assert success
results = await storage.retrieve(memory.content, n_results=1)
assert len(results) >= 1
await storage.close()
@pytest.mark.asyncio
async def test_graceful_degradation(self, temp_sqlite_db, mock_cloudflare_config):
"""Test graceful degradation when Cloudflare becomes unavailable."""
def unreliable_cloudflare_storage(**kwargs):
storage = MockCloudflareStorage(**kwargs)
# Will start working but then fail
return storage
with patch('mcp_memory_service.storage.hybrid.CloudflareStorage', unreliable_cloudflare_storage):
storage = HybridMemoryStorage(
sqlite_db_path=temp_sqlite_db,
cloudflare_config=mock_cloudflare_config
)
await storage.initialize()
# Initially should work
metadata = MemoryMetadata(memory_type="degradation_test", tags=["test"])
memory = Memory(content="Degradation test memory", metadata=metadata)
success, message = await storage.store(memory)
assert success
# Make Cloudflare fail
storage.secondary.fail_operations = True
# Should still work (primary storage unaffected)
memory2 = Memory(content="Second test memory", metadata=metadata)
success, message = await storage.store(memory2)
assert success
# Retrieval should still work
results = await storage.retrieve("test memory", n_results=10)
assert len(results) >= 2
await storage.close()
if __name__ == "__main__":
# Run tests
pytest.main([__file__, "-v"])