Skip to main content
Glama

Redis MCP Server

Official
by redis
test_pub_sub.py11.8 kB
""" Unit tests for src/tools/pub_sub.py """ from unittest.mock import Mock, patch import pytest from redis.exceptions import ConnectionError, RedisError from src.tools.pub_sub import publish, subscribe, unsubscribe class TestPubSubOperations: """Test cases for Redis pub/sub operations.""" @pytest.mark.asyncio async def test_publish_success(self, mock_redis_connection_manager): """Test successful publish operation.""" mock_redis = mock_redis_connection_manager mock_redis.publish.return_value = ( 2 # Number of subscribers that received the message ) result = await publish("test_channel", "Hello World") mock_redis.publish.assert_called_once_with("test_channel", "Hello World") assert "Message published to channel 'test_channel'" in result @pytest.mark.asyncio async def test_publish_no_subscribers(self, mock_redis_connection_manager): """Test publish operation with no subscribers.""" mock_redis = mock_redis_connection_manager mock_redis.publish.return_value = 0 # No subscribers result = await publish("empty_channel", "Hello World") mock_redis.publish.assert_called_once_with("empty_channel", "Hello World") assert "Message published to channel 'empty_channel'" in result @pytest.mark.asyncio async def test_publish_redis_error(self, mock_redis_connection_manager): """Test publish operation with Redis error.""" mock_redis = mock_redis_connection_manager mock_redis.publish.side_effect = RedisError("Connection failed") result = await publish("test_channel", "Hello World") assert ( "Error publishing message to channel 'test_channel': Connection failed" in result ) @pytest.mark.asyncio async def test_publish_connection_error(self, mock_redis_connection_manager): """Test publish operation with connection error.""" mock_redis = mock_redis_connection_manager mock_redis.publish.side_effect = ConnectionError("Redis server unavailable") result = await publish("test_channel", "Hello World") assert ( "Error publishing message to channel 'test_channel': Redis server unavailable" in result ) @pytest.mark.asyncio async def test_publish_empty_message(self, mock_redis_connection_manager): """Test publish operation with empty message.""" mock_redis = mock_redis_connection_manager mock_redis.publish.return_value = 1 result = await publish("test_channel", "") mock_redis.publish.assert_called_once_with("test_channel", "") assert "Message published to channel 'test_channel'" in result @pytest.mark.asyncio async def test_publish_numeric_message(self, mock_redis_connection_manager): """Test publish operation with numeric message.""" mock_redis = mock_redis_connection_manager mock_redis.publish.return_value = 1 result = await publish("test_channel", 42) mock_redis.publish.assert_called_once_with("test_channel", 42) assert "Message published to channel 'test_channel'" in result @pytest.mark.asyncio async def test_publish_json_message(self, mock_redis_connection_manager): """Test publish operation with JSON-like message.""" mock_redis = mock_redis_connection_manager mock_redis.publish.return_value = 3 json_message = ( '{"type": "notification", "data": {"user": "john", "action": "login"}}' ) result = await publish("notifications", json_message) mock_redis.publish.assert_called_once_with("notifications", json_message) assert "Message published to channel 'notifications'" in result @pytest.mark.asyncio async def test_publish_unicode_message(self, mock_redis_connection_manager): """Test publish operation with unicode message.""" mock_redis = mock_redis_connection_manager mock_redis.publish.return_value = 1 unicode_message = "Hello 世界 🌍" result = await publish("test_channel", unicode_message) mock_redis.publish.assert_called_once_with("test_channel", unicode_message) assert "Message published to channel 'test_channel'" in result @pytest.mark.asyncio async def test_subscribe_success(self, mock_redis_connection_manager): """Test successful subscribe operation.""" mock_redis = mock_redis_connection_manager mock_pubsub = Mock() mock_redis.pubsub.return_value = mock_pubsub mock_pubsub.subscribe.return_value = None result = await subscribe("test_channel") mock_redis.pubsub.assert_called_once() mock_pubsub.subscribe.assert_called_once_with("test_channel") assert "Subscribed to channel 'test_channel'" in result @pytest.mark.asyncio async def test_subscribe_redis_error(self, mock_redis_connection_manager): """Test subscribe operation with Redis error.""" mock_redis = mock_redis_connection_manager mock_redis.pubsub.side_effect = RedisError("Connection failed") result = await subscribe("test_channel") assert ( "Error subscribing to channel 'test_channel': Connection failed" in result ) @pytest.mark.asyncio async def test_subscribe_pubsub_error(self, mock_redis_connection_manager): """Test subscribe operation with pubsub creation error.""" mock_redis = mock_redis_connection_manager mock_pubsub = Mock() mock_redis.pubsub.return_value = mock_pubsub mock_pubsub.subscribe.side_effect = RedisError("Subscribe failed") result = await subscribe("test_channel") assert "Error subscribing to channel 'test_channel': Subscribe failed" in result @pytest.mark.asyncio async def test_subscribe_multiple_channels_pattern( self, mock_redis_connection_manager ): """Test subscribe operation with pattern-like channel name.""" mock_redis = mock_redis_connection_manager mock_pubsub = Mock() mock_redis.pubsub.return_value = mock_pubsub mock_pubsub.subscribe.return_value = None pattern_channel = "notifications:*" result = await subscribe(pattern_channel) mock_pubsub.subscribe.assert_called_once_with(pattern_channel) assert f"Subscribed to channel '{pattern_channel}'" in result @pytest.mark.asyncio async def test_unsubscribe_success(self, mock_redis_connection_manager): """Test successful unsubscribe operation.""" mock_redis = mock_redis_connection_manager mock_pubsub = Mock() mock_redis.pubsub.return_value = mock_pubsub mock_pubsub.unsubscribe.return_value = None result = await unsubscribe("test_channel") mock_redis.pubsub.assert_called_once() mock_pubsub.unsubscribe.assert_called_once_with("test_channel") assert "Unsubscribed from channel 'test_channel'" in result @pytest.mark.asyncio async def test_unsubscribe_redis_error(self, mock_redis_connection_manager): """Test unsubscribe operation with Redis error.""" mock_redis = mock_redis_connection_manager mock_redis.pubsub.side_effect = RedisError("Connection failed") result = await unsubscribe("test_channel") assert ( "Error unsubscribing from channel 'test_channel': Connection failed" in result ) @pytest.mark.asyncio async def test_unsubscribe_pubsub_error(self, mock_redis_connection_manager): """Test unsubscribe operation with pubsub error.""" mock_redis = mock_redis_connection_manager mock_pubsub = Mock() mock_redis.pubsub.return_value = mock_pubsub mock_pubsub.unsubscribe.side_effect = RedisError("Unsubscribe failed") result = await unsubscribe("test_channel") assert ( "Error unsubscribing from channel 'test_channel': Unsubscribe failed" in result ) @pytest.mark.asyncio async def test_unsubscribe_from_all_channels(self, mock_redis_connection_manager): """Test unsubscribe operation without specifying channel (unsubscribe from all).""" mock_redis = mock_redis_connection_manager mock_pubsub = Mock() mock_redis.pubsub.return_value = mock_pubsub mock_pubsub.unsubscribe.return_value = None # Test unsubscribing from specific channel result = await unsubscribe("specific_channel") mock_pubsub.unsubscribe.assert_called_once_with("specific_channel") assert "Unsubscribed from channel 'specific_channel'" in result @pytest.mark.asyncio async def test_publish_to_pattern_channel(self, mock_redis_connection_manager): """Test publish operation to pattern-like channel.""" mock_redis = mock_redis_connection_manager mock_redis.publish.return_value = 5 pattern_channel = "user:123:notifications" result = await publish(pattern_channel, "User notification") mock_redis.publish.assert_called_once_with(pattern_channel, "User notification") assert f"Message published to channel '{pattern_channel}'" in result @pytest.mark.asyncio async def test_subscribe_with_special_characters( self, mock_redis_connection_manager ): """Test subscribe operation with special characters in channel name.""" mock_redis = mock_redis_connection_manager mock_pubsub = Mock() mock_redis.pubsub.return_value = mock_pubsub mock_pubsub.subscribe.return_value = None special_channel = "channel:with:colons-and-dashes_and_underscores" result = await subscribe(special_channel) mock_pubsub.subscribe.assert_called_once_with(special_channel) assert f"Subscribed to channel '{special_channel}'" in result @pytest.mark.asyncio async def test_connection_manager_called_correctly(self): """Test that RedisConnectionManager.get_connection is called correctly.""" with patch( "src.tools.pub_sub.RedisConnectionManager.get_connection" ) as mock_get_conn: mock_redis = Mock() mock_redis.publish.return_value = 1 mock_get_conn.return_value = mock_redis await publish("test_channel", "test_message") mock_get_conn.assert_called_once() @pytest.mark.asyncio async def test_function_signatures(self): """Test that functions have correct signatures.""" import inspect # Test publish function signature publish_sig = inspect.signature(publish) publish_params = list(publish_sig.parameters.keys()) assert publish_params == ["channel", "message"] # Test subscribe function signature subscribe_sig = inspect.signature(subscribe) subscribe_params = list(subscribe_sig.parameters.keys()) assert subscribe_params == ["channel"] # Test unsubscribe function signature unsubscribe_sig = inspect.signature(unsubscribe) unsubscribe_params = list(unsubscribe_sig.parameters.keys()) assert unsubscribe_params == ["channel"] @pytest.mark.asyncio async def test_publish_large_message(self, mock_redis_connection_manager): """Test publish operation with large message.""" mock_redis = mock_redis_connection_manager mock_redis.publish.return_value = 1 large_message = "x" * 10000 # 10KB message result = await publish("test_channel", large_message) mock_redis.publish.assert_called_once_with("test_channel", large_message) assert "Message published to channel 'test_channel'" in result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/redis/mcp-redis'

If you have feedback or need assistance with the MCP directory API, please join our Discord server