Skip to main content
Glama
test_cache.py11.3 kB
"""Unit tests for Redis cache manager.""" import json import pytest from unittest.mock import AsyncMock, MagicMock from src.cache.redis_cache import CacheManager class TestCacheManager: """Test suite for cache manager.""" def test_initialization(self): """Test cache manager initialization.""" cache = CacheManager( host="localhost", port=6379, db=0, password="secret", ssl=True ) assert cache.host == "localhost" assert cache.port == 6379 assert cache.db == 0 assert cache.password == "secret" assert cache.ssl is True assert cache.client is None @pytest.mark.asyncio async def test_connect_success(self): """Test successful Redis connection.""" cache = CacheManager() # Mock Redis client mock_client = AsyncMock() mock_client.ping = AsyncMock() with pytest.mock.patch('redis.asyncio.Redis', return_value=mock_client): await cache.connect() assert cache.client is not None mock_client.ping.assert_called_once() @pytest.mark.asyncio async def test_connect_failure(self): """Test Redis connection failure.""" cache = CacheManager() # Mock Redis client that fails to connect with pytest.mock.patch('redis.asyncio.Redis', side_effect=Exception("Connection failed")): with pytest.raises(Exception, match="Connection failed"): await cache.connect() @pytest.mark.asyncio async def test_close(self, cache_manager): """Test closing Redis connection.""" await cache_manager.close() cache_manager.client.close.assert_called_once() @pytest.mark.asyncio async def test_get_cache_hit(self, cache_manager): """Test getting value from cache (cache hit).""" test_data = {"key": "value", "number": 123} cache_manager.client.get = AsyncMock(return_value=json.dumps(test_data)) result = await cache_manager.get("test_key") assert result == test_data cache_manager.client.get.assert_called_once_with("test_key") @pytest.mark.asyncio async def test_get_cache_miss(self, cache_manager): """Test getting value from cache (cache miss).""" cache_manager.client.get = AsyncMock(return_value=None) result = await cache_manager.get("nonexistent_key") assert result is None cache_manager.client.get.assert_called_once_with("nonexistent_key") @pytest.mark.asyncio async def test_get_without_connection(self): """Test getting value when not connected.""" cache = CacheManager() # Don't connect result = await cache.get("test_key") assert result is None @pytest.mark.asyncio async def test_get_error_handling(self, cache_manager): """Test error handling during get operation.""" cache_manager.client.get = AsyncMock(side_effect=Exception("Redis error")) result = await cache_manager.get("test_key") # Should return None on error assert result is None @pytest.mark.asyncio async def test_set_success(self, cache_manager): """Test setting value in cache.""" test_data = {"key": "value"} cache_manager.client.setex = AsyncMock() result = await cache_manager.set("test_key", test_data, ttl=3600) assert result is True cache_manager.client.setex.assert_called_once() call_args = cache_manager.client.setex.call_args assert call_args[0][0] == "test_key" assert call_args[0][1] == 3600 assert json.loads(call_args[0][2]) == test_data @pytest.mark.asyncio async def test_set_with_custom_ttl(self, cache_manager): """Test setting value with custom TTL.""" cache_manager.client.setex = AsyncMock() await cache_manager.set("test_key", {"data": "value"}, ttl=7200) call_args = cache_manager.client.setex.call_args assert call_args[0][1] == 7200 @pytest.mark.asyncio async def test_set_without_connection(self): """Test setting value when not connected.""" cache = CacheManager() # Don't connect result = await cache.set("test_key", {"data": "value"}) assert result is False @pytest.mark.asyncio async def test_set_error_handling(self, cache_manager): """Test error handling during set operation.""" cache_manager.client.setex = AsyncMock(side_effect=Exception("Redis error")) result = await cache_manager.set("test_key", {"data": "value"}) # Should return False on error assert result is False @pytest.mark.asyncio async def test_delete_success(self, cache_manager): """Test deleting key from cache.""" cache_manager.client.delete = AsyncMock() result = await cache_manager.delete("test_key") assert result is True cache_manager.client.delete.assert_called_once_with("test_key") @pytest.mark.asyncio async def test_delete_without_connection(self): """Test deleting key when not connected.""" cache = CacheManager() # Don't connect result = await cache.delete("test_key") assert result is False @pytest.mark.asyncio async def test_delete_error_handling(self, cache_manager): """Test error handling during delete operation.""" cache_manager.client.delete = AsyncMock(side_effect=Exception("Redis error")) result = await cache_manager.delete("test_key") # Should return False on error assert result is False @pytest.mark.asyncio async def test_exists_true(self, cache_manager): """Test checking if key exists (exists).""" cache_manager.client.exists = AsyncMock(return_value=1) result = await cache_manager.exists("test_key") assert result is True cache_manager.client.exists.assert_called_once_with("test_key") @pytest.mark.asyncio async def test_exists_false(self, cache_manager): """Test checking if key exists (doesn't exist).""" cache_manager.client.exists = AsyncMock(return_value=0) result = await cache_manager.exists("nonexistent_key") assert result is False @pytest.mark.asyncio async def test_exists_without_connection(self): """Test checking existence when not connected.""" cache = CacheManager() # Don't connect result = await cache.exists("test_key") assert result is False @pytest.mark.asyncio async def test_exists_error_handling(self, cache_manager): """Test error handling during exists operation.""" cache_manager.client.exists = AsyncMock(side_effect=Exception("Redis error")) result = await cache_manager.exists("test_key") # Should return False on error assert result is False @pytest.mark.asyncio async def test_set_and_get_complex_data(self, cache_manager): """Test setting and getting complex data structures.""" complex_data = { "string": "value", "number": 123, "float": 45.67, "boolean": True, "null": None, "array": [1, 2, 3], "nested": {"key": "value"} } # Mock set cache_manager.client.setex = AsyncMock() await cache_manager.set("complex_key", complex_data) # Mock get cache_manager.client.get = AsyncMock(return_value=json.dumps(complex_data)) result = await cache_manager.get("complex_key") assert result == complex_data @pytest.mark.asyncio async def test_json_serialization_error(self, cache_manager): """Test handling of non-serializable data.""" # Create non-serializable object class NonSerializable: pass cache_manager.client.setex = AsyncMock() # Should handle serialization error result = await cache_manager.set("test_key", NonSerializable()) assert result is False @pytest.mark.asyncio async def test_json_deserialization_error(self, cache_manager): """Test handling of invalid JSON in cache.""" cache_manager.client.get = AsyncMock(return_value="invalid json {") result = await cache_manager.get("test_key") # Should return None on deserialization error assert result is None @pytest.mark.asyncio async def test_multiple_operations(self, cache_manager): """Test multiple cache operations in sequence.""" # Set cache_manager.client.setex = AsyncMock() await cache_manager.set("key1", {"data": "value1"}) # Get cache_manager.client.get = AsyncMock(return_value=json.dumps({"data": "value1"})) result = await cache_manager.get("key1") assert result == {"data": "value1"} # Exists cache_manager.client.exists = AsyncMock(return_value=1) exists = await cache_manager.exists("key1") assert exists is True # Delete cache_manager.client.delete = AsyncMock() deleted = await cache_manager.delete("key1") assert deleted is True @pytest.mark.asyncio async def test_default_ttl(self, cache_manager): """Test that default TTL is used when not specified.""" cache_manager.client.setex = AsyncMock() await cache_manager.set("test_key", {"data": "value"}) call_args = cache_manager.client.setex.call_args assert call_args[0][1] == 3600 # Default TTL @pytest.mark.asyncio async def test_empty_string_value(self, cache_manager): """Test caching empty string value.""" cache_manager.client.setex = AsyncMock() cache_manager.client.get = AsyncMock(return_value=json.dumps("")) await cache_manager.set("test_key", "") result = await cache_manager.get("test_key") assert result == "" @pytest.mark.asyncio async def test_zero_value(self, cache_manager): """Test caching zero value.""" cache_manager.client.setex = AsyncMock() cache_manager.client.get = AsyncMock(return_value=json.dumps(0)) await cache_manager.set("test_key", 0) result = await cache_manager.get("test_key") assert result == 0 @pytest.mark.asyncio async def test_false_value(self, cache_manager): """Test caching False boolean value.""" cache_manager.client.setex = AsyncMock() cache_manager.client.get = AsyncMock(return_value=json.dumps(False)) await cache_manager.set("test_key", False) result = await cache_manager.get("test_key") assert result is False

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/CTD-Techs/CTD-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server