test_hybrid_cloudflare_limits.py•12.4 kB
#!/usr/bin/env python3
"""
Tests for Cloudflare limit handling in hybrid storage.
Tests cover:
- Pre-sync validation for oversized metadata
- Vector count limit enforcement
- Capacity monitoring and warnings
- Error handling for limit-related failures
"""
import asyncio
import pytest
import pytest_asyncio
import json
import tempfile
import os
import sys
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
# Add src to path for imports
current_dir = Path(__file__).parent
src_dir = current_dir.parent / "src"
sys.path.insert(0, str(src_dir))
from mcp_memory_service.storage.hybrid import HybridMemoryStorage, BackgroundSyncService, SyncOperation
from mcp_memory_service.models.memory import Memory
import hashlib
class MockCloudflareWithLimits:
"""Mock Cloudflare storage that simulates limit errors."""
def __init__(self, **kwargs):
self.memories = {}
self.vector_count = 0
self.max_vectors = 100 # Low limit for testing
self.initialized = False
self.fail_on_limit = True
async def initialize(self):
self.initialized = True
async def store(self, memory: Memory):
# Check vector limit
if self.vector_count >= self.max_vectors and self.fail_on_limit:
raise Exception("413 Request Entity Too Large: Vector limit exceeded")
# Check metadata size (simulate 10KB limit)
if memory.metadata:
metadata_json = json.dumps(memory.metadata)
if len(metadata_json) > 10240: # 10KB
raise Exception("Metadata too large: exceeds 10KB limit")
self.memories[memory.content_hash] = memory
self.vector_count += 1
return True, "Stored"
async def delete(self, content_hash: str):
if content_hash in self.memories:
del self.memories[content_hash]
self.vector_count -= 1
return True, "Deleted"
async def get_stats(self):
return {
"total_memories": self.vector_count,
"storage_backend": "MockCloudflareWithLimits"
}
async def update_memory_metadata(self, content_hash: str, updates, preserve_timestamps=True):
return True, "Updated"
async def close(self):
pass
@pytest_asyncio.fixture
async def temp_db():
"""Create a temporary SQLite database."""
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp:
db_path = tmp.name
yield db_path
if os.path.exists(db_path):
os.unlink(db_path)
@pytest_asyncio.fixture
async def hybrid_with_limits(temp_db):
"""Create hybrid storage with limit-aware mock Cloudflare."""
config = {
'api_token': 'test',
'account_id': 'test',
'vectorize_index': 'test',
'd1_database_id': 'test'
}
with patch('mcp_memory_service.storage.hybrid.CloudflareStorage', MockCloudflareWithLimits):
with patch('mcp_memory_service.storage.hybrid.CLOUDFLARE_VECTORIZE_MAX_VECTORS', 100):
with patch('mcp_memory_service.storage.hybrid.CLOUDFLARE_MAX_METADATA_SIZE_KB', 10):
storage = HybridMemoryStorage(
sqlite_db_path=temp_db,
cloudflare_config=config,
sync_interval=300,
batch_size=5
)
await storage.initialize()
yield storage
await storage.close()
class TestCloudflareMetadataLimits:
"""Test metadata size validation."""
@pytest.mark.asyncio
async def test_oversized_metadata_validation(self, hybrid_with_limits):
"""Test that oversized metadata is caught during validation."""
# Create memory with large metadata (> 10KB)
large_metadata = {"data": "x" * 11000} # Over 10KB when serialized
memory = Memory(
content="Test memory with large metadata",
content_hash=hashlib.sha256(b"test").hexdigest(),
tags=["test"],
metadata=large_metadata
)
# Validation should fail
is_valid, error = await hybrid_with_limits.sync_service.validate_memory_for_cloudflare(memory)
assert not is_valid
assert "exceeds Cloudflare limit" in error
@pytest.mark.asyncio
async def test_normal_metadata_passes_validation(self, hybrid_with_limits):
"""Test that normal-sized metadata passes validation."""
normal_metadata = {"key": "value", "index": 123}
memory = Memory(
content="Test memory with normal metadata",
content_hash=hashlib.sha256(b"test2").hexdigest(),
tags=["test"],
metadata=normal_metadata
)
# Validation should pass
is_valid, error = await hybrid_with_limits.sync_service.validate_memory_for_cloudflare(memory)
assert is_valid
assert error is None
class TestVectorCountLimits:
"""Test vector count limit handling."""
@pytest.mark.asyncio
async def test_vector_limit_detection(self, hybrid_with_limits):
"""Test detection when approaching vector count limit."""
# Simulate high vector count
hybrid_with_limits.sync_service.cloudflare_stats['vector_count'] = 95
# Check capacity should detect we're at 95% (critical)
capacity = await hybrid_with_limits.sync_service.check_cloudflare_capacity()
assert capacity['vector_usage_percent'] == 95.0
assert capacity['approaching_limits'] is True
assert len(capacity['warnings']) > 0
assert "CRITICAL" in capacity['warnings'][0]
@pytest.mark.asyncio
async def test_vector_limit_enforcement(self, hybrid_with_limits):
"""Test that sync stops when vector limit is reached."""
# Set vector count at limit
hybrid_with_limits.sync_service.cloudflare_stats['vector_count'] = 100
memory = Memory(
content="Memory that should be rejected",
content_hash=hashlib.sha256(b"rejected").hexdigest(),
tags=["test"]
)
# Validation should fail due to limit
is_valid, error = await hybrid_with_limits.sync_service.validate_memory_for_cloudflare(memory)
assert not is_valid
assert "vector limit" in error.lower()
class TestErrorHandling:
"""Test error handling for various limit scenarios."""
@pytest.mark.asyncio
async def test_limit_error_no_retry(self, hybrid_with_limits):
"""Test that limit errors are not retried."""
operation = SyncOperation(
operation='store',
memory=Memory(content="test", content_hash="hash123", tags=[])
)
# Simulate a limit error
error = Exception("413 Request Entity Too Large: Vector limit exceeded")
await hybrid_with_limits.sync_service._handle_sync_error(error, operation)
# Should not be added to retry queue
assert len(hybrid_with_limits.sync_service.failed_operations) == 0
# Should be marked as failed
assert hybrid_with_limits.sync_service.sync_stats['operations_failed'] == 1
@pytest.mark.asyncio
async def test_temporary_error_retry(self, hybrid_with_limits):
"""Test that temporary errors are retried."""
operation = SyncOperation(
operation='store',
memory=Memory(content="test", content_hash="hash456", tags=[]),
retries=0
)
# Simulate a temporary error
error = Exception("503 Service Temporarily Unavailable")
await hybrid_with_limits.sync_service._handle_sync_error(error, operation)
# Should be added to retry queue
assert len(hybrid_with_limits.sync_service.failed_operations) == 1
# Should not be marked as failed yet
assert hybrid_with_limits.sync_service.sync_stats['operations_failed'] == 0
@pytest.mark.asyncio
async def test_max_retries_reached(self, hybrid_with_limits):
"""Test that operations fail after max retries."""
operation = SyncOperation(
operation='store',
memory=Memory(content="test", content_hash="hash789", tags=[]),
retries=2, # Already retried twice
max_retries=3
)
# Simulate another temporary error
error = Exception("Connection timeout")
await hybrid_with_limits.sync_service._handle_sync_error(error, operation)
# Should not be added to retry queue (max retries reached)
assert len(hybrid_with_limits.sync_service.failed_operations) == 0
# Should be marked as failed
assert hybrid_with_limits.sync_service.sync_stats['operations_failed'] == 1
class TestCapacityMonitoring:
"""Test capacity monitoring and warnings."""
@pytest.mark.asyncio
async def test_capacity_warning_thresholds(self, hybrid_with_limits):
"""Test warning at 80% and critical at 95% thresholds."""
service = hybrid_with_limits.sync_service
# Test 50% - no warning
service.cloudflare_stats['vector_count'] = 50
capacity = await service.check_cloudflare_capacity()
assert not capacity['approaching_limits']
assert len(capacity['warnings']) == 0
# Test 80% - warning
service.cloudflare_stats['vector_count'] = 80
capacity = await service.check_cloudflare_capacity()
assert capacity['approaching_limits']
assert "WARNING" in capacity['warnings'][0]
# Test 95% - critical
service.cloudflare_stats['vector_count'] = 95
capacity = await service.check_cloudflare_capacity()
assert capacity['approaching_limits']
assert "CRITICAL" in capacity['warnings'][0]
@pytest.mark.asyncio
async def test_sync_status_includes_capacity(self, hybrid_with_limits):
"""Test that sync status includes capacity information."""
status = await hybrid_with_limits.sync_service.get_sync_status()
assert 'capacity' in status
assert 'vector_count' in status['capacity']
assert 'vector_limit' in status['capacity']
assert 'approaching_limits' in status['capacity']
assert 'warnings' in status['capacity']
class TestIntegrationScenarios:
"""Test complete scenarios with limit handling."""
@pytest.mark.asyncio
async def test_sync_stops_at_limit(self, hybrid_with_limits):
"""Test that sync gracefully handles reaching the limit."""
# Add memories up to the limit
memories = []
for i in range(105): # Try to exceed limit of 100
memory = Memory(
content=f"Memory {i}",
content_hash=hashlib.sha256(f"hash{i}".encode()).hexdigest(),
tags=["bulk"],
metadata={"index": i}
)
memories.append(memory)
# Process memories
successful = 0
failed = 0
for memory in memories:
operation = SyncOperation(operation='store', memory=memory)
# Validate first
is_valid, _ = await hybrid_with_limits.sync_service.validate_memory_for_cloudflare(memory)
if is_valid:
try:
await hybrid_with_limits.sync_service._process_single_operation(operation)
successful += 1
except Exception:
failed += 1
else:
failed += 1
# Should stop at or before the limit
assert successful <= 100
assert failed >= 5 # At least 5 should fail due to limit
@pytest.mark.asyncio
async def test_periodic_capacity_check(self, hybrid_with_limits):
"""Test that periodic sync checks capacity."""
# Set up near-limit scenario
hybrid_with_limits.sync_service.cloudflare_stats['vector_count'] = 85
# Mock the secondary's get_stats
async def mock_get_stats():
return {"total_memories": 85}
hybrid_with_limits.sync_service.secondary.get_stats = mock_get_stats
# Run periodic sync
await hybrid_with_limits.sync_service._periodic_sync()
# Should have detected approaching limits
assert hybrid_with_limits.sync_service.cloudflare_stats['approaching_limits']
assert len(hybrid_with_limits.sync_service.cloudflare_stats['limit_warnings']) > 0
if __name__ == "__main__":
pytest.main([__file__, "-v"])