Skip to main content
Glama

MCP Stock Details Server

by whdghk1907
test_cache_system.pyβ€’14.2 kB
""" Unit tests for caching system TDD Red Phase: Write failing tests for caching functionality """ import pytest import asyncio from unittest.mock import Mock, AsyncMock, patch from datetime import datetime, timedelta from typing import Any, Dict, Optional # Test imports from src.cache import CacheManager, MemoryCache, RedisCache from src.exceptions import CacheError from src.config import get_settings class TestMemoryCache: """Test cases for in-memory caching""" @pytest.fixture def memory_cache(self): """Create memory cache instance for testing""" return MemoryCache(max_size=100, default_ttl=3600) @pytest.mark.asyncio async def test_memory_cache_initialization(self, memory_cache): """Test memory cache initializes correctly""" assert memory_cache is not None assert memory_cache.max_size == 100 assert memory_cache.default_ttl == 3600 assert memory_cache.size == 0 @pytest.mark.asyncio async def test_cache_set_and_get(self, memory_cache): """Test basic cache set and get operations""" key = "test_key" value = {"data": "test_value", "number": 123} # Set value in cache await memory_cache.set(key, value) # Retrieve value from cache cached_value = await memory_cache.get(key) assert cached_value is not None assert cached_value == value assert memory_cache.size == 1 @pytest.mark.asyncio async def test_cache_expiration(self, memory_cache): """Test cache expiration functionality""" key = "expiring_key" value = {"data": "will_expire"} ttl = 1 # 1 second # Set value with short TTL await memory_cache.set(key, value, ttl=ttl) # Should be available immediately cached_value = await memory_cache.get(key) assert cached_value == value # Wait for expiration await asyncio.sleep(1.1) # Should be expired now expired_value = await memory_cache.get(key) assert expired_value is None assert memory_cache.size == 0 @pytest.mark.asyncio async def test_cache_max_size_limit(self, memory_cache): """Test cache size limitation""" # Fill cache to max capacity for i in range(memory_cache.max_size): await memory_cache.set(f"key_{i}", f"value_{i}") assert memory_cache.size == memory_cache.max_size # Add one more item (should evict oldest) await memory_cache.set("overflow_key", "overflow_value") # Size should remain at max assert memory_cache.size == memory_cache.max_size # Oldest item should be evicted oldest_value = await memory_cache.get("key_0") assert oldest_value is None # Newest item should be present newest_value = await memory_cache.get("overflow_key") assert newest_value == "overflow_value" @pytest.mark.asyncio async def test_cache_delete(self, memory_cache): """Test cache deletion""" key = "delete_me" value = {"data": "to_be_deleted"} # Set and verify await memory_cache.set(key, value) assert await memory_cache.get(key) == value assert memory_cache.size == 1 # Delete and verify deleted = await memory_cache.delete(key) assert deleted is True assert await memory_cache.get(key) is None assert memory_cache.size == 0 # Try to delete non-existent key deleted_again = await memory_cache.delete(key) assert deleted_again is False @pytest.mark.asyncio async def test_cache_clear(self, memory_cache): """Test cache clearing""" # Add multiple items for i in range(10): await memory_cache.set(f"key_{i}", f"value_{i}") assert memory_cache.size == 10 # Clear cache await memory_cache.clear() assert memory_cache.size == 0 # Verify all items are gone for i in range(10): value = await memory_cache.get(f"key_{i}") assert value is None class TestRedisCache: """Test cases for Redis caching""" @pytest.fixture def redis_cache(self): """Create Redis cache instance for testing""" return RedisCache(host="localhost", port=6379, db=1, default_ttl=3600) @pytest.mark.asyncio async def test_redis_cache_initialization(self, redis_cache): """Test Redis cache initializes correctly""" assert redis_cache is not None assert redis_cache.host == "localhost" assert redis_cache.port == 6379 assert redis_cache.db == 1 assert redis_cache.default_ttl == 3600 @pytest.mark.asyncio async def test_redis_connection(self, redis_cache): """Test Redis connection""" # This test might be skipped if Redis is not available try: await redis_cache.ping() connection_success = True except Exception: pytest.skip("Redis not available for testing") assert connection_success is True @pytest.mark.asyncio async def test_redis_set_and_get(self, redis_cache): """Test Redis set and get operations""" key = "redis_test_key" value = {"data": "redis_test_value", "timestamp": datetime.now().isoformat()} try: # Set value in Redis await redis_cache.set(key, value) # Retrieve value from Redis cached_value = await redis_cache.get(key) assert cached_value is not None assert cached_value["data"] == value["data"] # Clean up await redis_cache.delete(key) except Exception as e: if "connection" in str(e).lower(): pytest.skip("Redis not available for testing") else: raise @pytest.mark.asyncio async def test_redis_expiration(self, redis_cache): """Test Redis key expiration""" key = "redis_expiring_key" value = {"data": "will_expire"} ttl = 2 # 2 seconds try: # Set value with TTL await redis_cache.set(key, value, ttl=ttl) # Should be available immediately cached_value = await redis_cache.get(key) assert cached_value["data"] == value["data"] # Wait for expiration await asyncio.sleep(2.1) # Should be expired now expired_value = await redis_cache.get(key) assert expired_value is None except Exception as e: if "connection" in str(e).lower(): pytest.skip("Redis not available for testing") else: raise class TestCacheManager: """Test cases for cache manager (multi-level caching)""" @pytest.fixture def cache_manager(self): """Create cache manager with memory and Redis caches""" memory_cache = MemoryCache(max_size=50, default_ttl=300) # 5 min # Use real Redis cache in mock mode for testing redis_cache = RedisCache( host="localhost", port=6379, default_ttl=600, mock_mode=True # Use mock mode for testing ) return CacheManager( l1_cache=memory_cache, l2_cache=redis_cache ) @pytest.mark.asyncio async def test_cache_manager_initialization(self, cache_manager): """Test cache manager initializes correctly""" assert cache_manager is not None assert cache_manager.l1_cache is not None assert cache_manager.l2_cache is not None assert cache_manager.enable_l1 is True assert cache_manager.enable_l2 is True @pytest.mark.asyncio async def test_multi_level_cache_get(self, cache_manager): """Test multi-level cache retrieval""" key = "multi_level_key" value = {"data": "multi_level_value"} # Set in L1 cache only await cache_manager.l1_cache.set(key, value) # Should retrieve from L1 cached_value = await cache_manager.get(key) assert cached_value == value # L2 cache should not have been called cache_manager.l2_cache.get.assert_not_called() @pytest.mark.asyncio async def test_cache_fallback_to_l2(self, cache_manager): """Test fallback to L2 cache when L1 misses""" key = "l2_fallback_key" value = {"data": "from_l2_cache"} # Ensure L2 cache is connected await cache_manager.l2_cache.connect() # Directly set value in L2 cache set_result = await cache_manager.l2_cache.set(key, value) assert set_result == True # Verify value is in L2 cache l2_direct_value = await cache_manager.l2_cache.get(key) assert l2_direct_value == value # Ensure L1 cache is empty for this key await cache_manager.l1_cache.delete(key) # Should fallback to L2 and populate L1 cached_value = await cache_manager.get(key) assert cached_value == value # Verify cache statistics show L2 hit stats = cache_manager.stats() assert stats["l2_hits"] >= 1 @pytest.mark.asyncio async def test_cache_set_both_levels(self, cache_manager): """Test setting value in both cache levels""" key = "both_levels_key" value = {"data": "both_levels_value"} ttl = 1800 # Ensure cache manager is properly initialized await cache_manager._ensure_background_tasks() # Set value through cache manager result = await cache_manager.set(key, value, ttl=ttl) assert result == True # Test L1 cache directly first l1_direct = await cache_manager.l1_cache.set(key + "_direct", value, ttl) assert l1_direct == True l1_direct_get = await cache_manager.l1_cache.get(key + "_direct") assert l1_direct_get == value # Should be set in L1 cache l1_value = await cache_manager.l1_cache.get(key) if l1_value != value: # Debug information print(f"L1 value: {l1_value}") print(f"Expected: {value}") print(f"L1 cache stats: {cache_manager.l1_cache.stats()}") assert l1_value == value # Should also be set in L2 cache l2_value = await cache_manager.l2_cache.get(key) assert l2_value == value @pytest.mark.asyncio async def test_cache_invalidation(self, cache_manager): """Test cache invalidation across both levels""" key = "invalidate_key" value = {"data": "to_be_invalidated"} # Set in both levels await cache_manager.set(key, value) # Invalidate await cache_manager.delete(key) # Should be removed from L1 l1_value = await cache_manager.l1_cache.get(key) assert l1_value is None # Should also be removed from L2 cache_manager.l2_cache.delete.assert_called_once_with(key) @pytest.mark.asyncio async def test_cache_key_generation(self, cache_manager): """Test cache key generation for different data types""" # Test with string string_key = cache_manager.generate_cache_key("company_info", "005930") assert "company_info:005930" in string_key # Test with multiple parameters multi_param_key = cache_manager.generate_cache_key( "financial_data", "005930", year=2023, quarter=4 ) assert "financial_data:005930" in multi_param_key assert "year=2023" in multi_param_key assert "quarter=4" in multi_param_key # Test with complex objects complex_key = cache_manager.generate_key( "analysis", {"company": "005930", "type": "peer_comparison"} ) assert "analysis:" in complex_key assert len(complex_key) > 20 # Should be reasonably long hash @pytest.mark.asyncio async def test_cache_statistics(self, cache_manager): """Test cache statistics and monitoring""" # Perform various cache operations await cache_manager.set("key1", "value1") await cache_manager.set("key2", "value2") await cache_manager.get("key1") # Hit await cache_manager.get("key3") # Miss stats = cache_manager.stats() assert "hits" in stats assert "misses" in stats assert "sets" in stats assert "hit_rate" in stats assert stats["sets"] == 2 assert stats["hits"] >= 1 assert stats["misses"] >= 1 assert 0 <= stats["hit_rate"] <= 100 class TestCacheIntegration: """Integration tests for caching with actual data collectors""" @pytest.fixture def cached_dart_collector(self): """Create DART collector with caching enabled""" # This will need to be implemented pass @pytest.mark.asyncio async def test_company_data_caching(self): """Test that company data is properly cached""" # Test will be implemented with actual DART collector pass @pytest.mark.asyncio async def test_cache_warming(self): """Test cache warming strategies""" # Test will be implemented for preloading popular data pass @pytest.mark.asyncio async def test_cache_refresh_strategies(self): """Test different cache refresh strategies""" # Test will be implemented for background refresh pass if __name__ == "__main__": # Run tests with pytest pytest.main([__file__, "-v"])

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/whdghk1907/mcp-stock-details'

If you have feedback or need assistance with the MCP directory API, please join our Discord server