test_cache_system.pyβ’14.2 kB
"""
Unit tests for caching system
TDD Red Phase: Write failing tests for caching functionality
"""
import pytest
import asyncio
from unittest.mock import Mock, AsyncMock, patch
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
# Test imports
from src.cache import CacheManager, MemoryCache, RedisCache
from src.exceptions import CacheError
from src.config import get_settings
class TestMemoryCache:
"""Test cases for in-memory caching"""
@pytest.fixture
def memory_cache(self):
"""Create memory cache instance for testing"""
return MemoryCache(max_size=100, default_ttl=3600)
@pytest.mark.asyncio
async def test_memory_cache_initialization(self, memory_cache):
"""Test memory cache initializes correctly"""
assert memory_cache is not None
assert memory_cache.max_size == 100
assert memory_cache.default_ttl == 3600
assert memory_cache.size == 0
@pytest.mark.asyncio
async def test_cache_set_and_get(self, memory_cache):
"""Test basic cache set and get operations"""
key = "test_key"
value = {"data": "test_value", "number": 123}
# Set value in cache
await memory_cache.set(key, value)
# Retrieve value from cache
cached_value = await memory_cache.get(key)
assert cached_value is not None
assert cached_value == value
assert memory_cache.size == 1
@pytest.mark.asyncio
async def test_cache_expiration(self, memory_cache):
"""Test cache expiration functionality"""
key = "expiring_key"
value = {"data": "will_expire"}
ttl = 1 # 1 second
# Set value with short TTL
await memory_cache.set(key, value, ttl=ttl)
# Should be available immediately
cached_value = await memory_cache.get(key)
assert cached_value == value
# Wait for expiration
await asyncio.sleep(1.1)
# Should be expired now
expired_value = await memory_cache.get(key)
assert expired_value is None
assert memory_cache.size == 0
@pytest.mark.asyncio
async def test_cache_max_size_limit(self, memory_cache):
"""Test cache size limitation"""
# Fill cache to max capacity
for i in range(memory_cache.max_size):
await memory_cache.set(f"key_{i}", f"value_{i}")
assert memory_cache.size == memory_cache.max_size
# Add one more item (should evict oldest)
await memory_cache.set("overflow_key", "overflow_value")
# Size should remain at max
assert memory_cache.size == memory_cache.max_size
# Oldest item should be evicted
oldest_value = await memory_cache.get("key_0")
assert oldest_value is None
# Newest item should be present
newest_value = await memory_cache.get("overflow_key")
assert newest_value == "overflow_value"
@pytest.mark.asyncio
async def test_cache_delete(self, memory_cache):
"""Test cache deletion"""
key = "delete_me"
value = {"data": "to_be_deleted"}
# Set and verify
await memory_cache.set(key, value)
assert await memory_cache.get(key) == value
assert memory_cache.size == 1
# Delete and verify
deleted = await memory_cache.delete(key)
assert deleted is True
assert await memory_cache.get(key) is None
assert memory_cache.size == 0
# Try to delete non-existent key
deleted_again = await memory_cache.delete(key)
assert deleted_again is False
@pytest.mark.asyncio
async def test_cache_clear(self, memory_cache):
"""Test cache clearing"""
# Add multiple items
for i in range(10):
await memory_cache.set(f"key_{i}", f"value_{i}")
assert memory_cache.size == 10
# Clear cache
await memory_cache.clear()
assert memory_cache.size == 0
# Verify all items are gone
for i in range(10):
value = await memory_cache.get(f"key_{i}")
assert value is None
class TestRedisCache:
"""Test cases for Redis caching"""
@pytest.fixture
def redis_cache(self):
"""Create Redis cache instance for testing"""
return RedisCache(host="localhost", port=6379, db=1, default_ttl=3600)
@pytest.mark.asyncio
async def test_redis_cache_initialization(self, redis_cache):
"""Test Redis cache initializes correctly"""
assert redis_cache is not None
assert redis_cache.host == "localhost"
assert redis_cache.port == 6379
assert redis_cache.db == 1
assert redis_cache.default_ttl == 3600
@pytest.mark.asyncio
async def test_redis_connection(self, redis_cache):
"""Test Redis connection"""
# This test might be skipped if Redis is not available
try:
await redis_cache.ping()
connection_success = True
except Exception:
pytest.skip("Redis not available for testing")
assert connection_success is True
@pytest.mark.asyncio
async def test_redis_set_and_get(self, redis_cache):
"""Test Redis set and get operations"""
key = "redis_test_key"
value = {"data": "redis_test_value", "timestamp": datetime.now().isoformat()}
try:
# Set value in Redis
await redis_cache.set(key, value)
# Retrieve value from Redis
cached_value = await redis_cache.get(key)
assert cached_value is not None
assert cached_value["data"] == value["data"]
# Clean up
await redis_cache.delete(key)
except Exception as e:
if "connection" in str(e).lower():
pytest.skip("Redis not available for testing")
else:
raise
@pytest.mark.asyncio
async def test_redis_expiration(self, redis_cache):
"""Test Redis key expiration"""
key = "redis_expiring_key"
value = {"data": "will_expire"}
ttl = 2 # 2 seconds
try:
# Set value with TTL
await redis_cache.set(key, value, ttl=ttl)
# Should be available immediately
cached_value = await redis_cache.get(key)
assert cached_value["data"] == value["data"]
# Wait for expiration
await asyncio.sleep(2.1)
# Should be expired now
expired_value = await redis_cache.get(key)
assert expired_value is None
except Exception as e:
if "connection" in str(e).lower():
pytest.skip("Redis not available for testing")
else:
raise
class TestCacheManager:
"""Test cases for cache manager (multi-level caching)"""
@pytest.fixture
def cache_manager(self):
"""Create cache manager with memory and Redis caches"""
memory_cache = MemoryCache(max_size=50, default_ttl=300) # 5 min
# Use real Redis cache in mock mode for testing
redis_cache = RedisCache(
host="localhost",
port=6379,
default_ttl=600,
mock_mode=True # Use mock mode for testing
)
return CacheManager(
l1_cache=memory_cache,
l2_cache=redis_cache
)
@pytest.mark.asyncio
async def test_cache_manager_initialization(self, cache_manager):
"""Test cache manager initializes correctly"""
assert cache_manager is not None
assert cache_manager.l1_cache is not None
assert cache_manager.l2_cache is not None
assert cache_manager.enable_l1 is True
assert cache_manager.enable_l2 is True
@pytest.mark.asyncio
async def test_multi_level_cache_get(self, cache_manager):
"""Test multi-level cache retrieval"""
key = "multi_level_key"
value = {"data": "multi_level_value"}
# Set in L1 cache only
await cache_manager.l1_cache.set(key, value)
# Should retrieve from L1
cached_value = await cache_manager.get(key)
assert cached_value == value
# L2 cache should not have been called
cache_manager.l2_cache.get.assert_not_called()
@pytest.mark.asyncio
async def test_cache_fallback_to_l2(self, cache_manager):
"""Test fallback to L2 cache when L1 misses"""
key = "l2_fallback_key"
value = {"data": "from_l2_cache"}
# Ensure L2 cache is connected
await cache_manager.l2_cache.connect()
# Directly set value in L2 cache
set_result = await cache_manager.l2_cache.set(key, value)
assert set_result == True
# Verify value is in L2 cache
l2_direct_value = await cache_manager.l2_cache.get(key)
assert l2_direct_value == value
# Ensure L1 cache is empty for this key
await cache_manager.l1_cache.delete(key)
# Should fallback to L2 and populate L1
cached_value = await cache_manager.get(key)
assert cached_value == value
# Verify cache statistics show L2 hit
stats = cache_manager.stats()
assert stats["l2_hits"] >= 1
@pytest.mark.asyncio
async def test_cache_set_both_levels(self, cache_manager):
"""Test setting value in both cache levels"""
key = "both_levels_key"
value = {"data": "both_levels_value"}
ttl = 1800
# Ensure cache manager is properly initialized
await cache_manager._ensure_background_tasks()
# Set value through cache manager
result = await cache_manager.set(key, value, ttl=ttl)
assert result == True
# Test L1 cache directly first
l1_direct = await cache_manager.l1_cache.set(key + "_direct", value, ttl)
assert l1_direct == True
l1_direct_get = await cache_manager.l1_cache.get(key + "_direct")
assert l1_direct_get == value
# Should be set in L1 cache
l1_value = await cache_manager.l1_cache.get(key)
if l1_value != value:
# Debug information
print(f"L1 value: {l1_value}")
print(f"Expected: {value}")
print(f"L1 cache stats: {cache_manager.l1_cache.stats()}")
assert l1_value == value
# Should also be set in L2 cache
l2_value = await cache_manager.l2_cache.get(key)
assert l2_value == value
@pytest.mark.asyncio
async def test_cache_invalidation(self, cache_manager):
"""Test cache invalidation across both levels"""
key = "invalidate_key"
value = {"data": "to_be_invalidated"}
# Set in both levels
await cache_manager.set(key, value)
# Invalidate
await cache_manager.delete(key)
# Should be removed from L1
l1_value = await cache_manager.l1_cache.get(key)
assert l1_value is None
# Should also be removed from L2
cache_manager.l2_cache.delete.assert_called_once_with(key)
@pytest.mark.asyncio
async def test_cache_key_generation(self, cache_manager):
"""Test cache key generation for different data types"""
# Test with string
string_key = cache_manager.generate_cache_key("company_info", "005930")
assert "company_info:005930" in string_key
# Test with multiple parameters
multi_param_key = cache_manager.generate_cache_key(
"financial_data", "005930", year=2023, quarter=4
)
assert "financial_data:005930" in multi_param_key
assert "year=2023" in multi_param_key
assert "quarter=4" in multi_param_key
# Test with complex objects
complex_key = cache_manager.generate_key(
"analysis", {"company": "005930", "type": "peer_comparison"}
)
assert "analysis:" in complex_key
assert len(complex_key) > 20 # Should be reasonably long hash
@pytest.mark.asyncio
async def test_cache_statistics(self, cache_manager):
"""Test cache statistics and monitoring"""
# Perform various cache operations
await cache_manager.set("key1", "value1")
await cache_manager.set("key2", "value2")
await cache_manager.get("key1") # Hit
await cache_manager.get("key3") # Miss
stats = cache_manager.stats()
assert "hits" in stats
assert "misses" in stats
assert "sets" in stats
assert "hit_rate" in stats
assert stats["sets"] == 2
assert stats["hits"] >= 1
assert stats["misses"] >= 1
assert 0 <= stats["hit_rate"] <= 100
class TestCacheIntegration:
"""Integration tests for caching with actual data collectors"""
@pytest.fixture
def cached_dart_collector(self):
"""Create DART collector with caching enabled"""
# This will need to be implemented
pass
@pytest.mark.asyncio
async def test_company_data_caching(self):
"""Test that company data is properly cached"""
# Test will be implemented with actual DART collector
pass
@pytest.mark.asyncio
async def test_cache_warming(self):
"""Test cache warming strategies"""
# Test will be implemented for preloading popular data
pass
@pytest.mark.asyncio
async def test_cache_refresh_strategies(self):
"""Test different cache refresh strategies"""
# Test will be implemented for background refresh
pass
if __name__ == "__main__":
# Run tests with pytest
pytest.main([__file__, "-v"])