Skip to main content
Glama
test_redis.pyβ€’7.31 kB
#!/usr/bin/env python3 """ Test Redis Connection and Basic Operations """ import asyncio import sys import os from datetime import datetime # Add project root to path project_root = os.path.join(os.path.dirname(__file__), '..') sys.path.insert(0, project_root) from src.redis_client.client import RedisClient, get_redis_client import logging # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) async def test_redis_connection(): """Test basic Redis connection""" logger.info("πŸ”Œ Testing Redis connection...") try: client = RedisClient() await client.connect() # Health check is_healthy = await client.health_check() if is_healthy: logger.info("βœ… Redis connection successful!") return client else: logger.error("❌ Redis health check failed!") return None except Exception as e: logger.error(f"❌ Redis connection failed: {e}") return None async def test_basic_operations(client: RedisClient): """Test basic Redis operations""" logger.info("πŸ§ͺ Testing basic Redis operations...") try: # Test SET/GET test_key = "iris:test:basic" test_value = {"message": "Hello IRIS!", "timestamp": datetime.now().isoformat()} # Set value success = await client.set(test_key, test_value, ttl=60) assert success, "SET operation failed" logger.info(f"βœ… SET: {test_key}") # Get value retrieved = await client.get(test_key) assert retrieved == test_value, f"GET mismatch: {retrieved} != {test_value}" logger.info(f"βœ… GET: {retrieved}") # Test EXISTS exists = await client.exists(test_key) assert exists, "EXISTS check failed" logger.info("βœ… EXISTS check passed") # Test DELETE deleted = await client.delete(test_key) assert deleted == 1, "DELETE operation failed" logger.info("βœ… DELETE operation successful") # Verify deletion retrieved_after_delete = await client.get(test_key) assert retrieved_after_delete is None, "Key still exists after deletion" logger.info("βœ… Key properly deleted") except Exception as e: logger.error(f"❌ Basic operations test failed: {e}") raise async def test_list_operations(client: RedisClient): """Test Redis list operations""" logger.info("πŸ“ Testing list operations...") try: list_key = "iris:test:messages" # Test LPUSH messages = [ {"role": "user", "content": "Hello!"}, {"role": "assistant", "content": "Hi there!"}, {"role": "user", "content": "How are you?"} ] for msg in messages: await client.lpush(list_key, msg) logger.info(f"βœ… LPUSH: Added {len(messages)} messages") # Test LLEN length = await client.llen(list_key) assert length == len(messages), f"List length mismatch: {length} != {len(messages)}" logger.info(f"βœ… LLEN: {length}") # Test LRANGE retrieved_messages = await client.lrange(list_key) assert len(retrieved_messages) == len(messages), "LRANGE length mismatch" logger.info(f"βœ… LRANGE: Retrieved {len(retrieved_messages)} messages") # Test LTRIM (keep last 2 messages) await client.ltrim(list_key, 0, 1) trimmed_length = await client.llen(list_key) assert trimmed_length == 2, f"LTRIM failed: {trimmed_length} != 2" logger.info("βœ… LTRIM: Trimmed to 2 messages") # Cleanup await client.delete(list_key) logger.info("βœ… List operations cleanup complete") except Exception as e: logger.error(f"❌ List operations test failed: {e}") raise async def test_hash_operations(client: RedisClient): """Test Redis hash operations""" logger.info("πŸ—‚οΈ Testing hash operations...") try: hash_key = "iris:test:user:123456789" # Test HSET user_data = { "username": "test_user", "first_name": "Test", "preferences": {"theme": "dark", "notifications": True}, "last_activity": datetime.now().isoformat() } for field, value in user_data.items(): await client.hset(hash_key, field, value) logger.info(f"βœ… HSET: Set {len(user_data)} fields") # Test HGET username = await client.hget(hash_key, "username") assert username == "test_user", f"HGET mismatch: {username} != test_user" logger.info(f"βœ… HGET: {username}") # Test HGETALL all_data = await client.hgetall(hash_key) assert len(all_data) == len(user_data), "HGETALL length mismatch" logger.info(f"βœ… HGETALL: Retrieved {len(all_data)} fields") # Test HDEL deleted_fields = await client.hdel(hash_key, "last_activity") assert deleted_fields == 1, "HDEL failed" logger.info("βœ… HDEL: Deleted field") # Cleanup await client.delete(hash_key) logger.info("βœ… Hash operations cleanup complete") except Exception as e: logger.error(f"❌ Hash operations test failed: {e}") raise async def test_ttl_operations(client: RedisClient): """Test TTL operations""" logger.info("⏰ Testing TTL operations...") try: ttl_key = "iris:test:ttl" # Set with TTL await client.set(ttl_key, "temporary_value", ttl=2) logger.info("βœ… SET with TTL: 2 seconds") # Check exists immediately exists_before = await client.exists(ttl_key) assert exists_before, "Key should exist immediately after SET" logger.info("βœ… Key exists immediately after SET") # Wait and check expiration logger.info("⏳ Waiting 3 seconds for TTL expiration...") await asyncio.sleep(3) exists_after = await client.exists(ttl_key) assert not exists_after, "Key should have expired" logger.info("βœ… Key expired after TTL") except Exception as e: logger.error(f"❌ TTL operations test failed: {e}") raise async def main(): """Run all Redis tests""" try: logger.info("πŸš€ Starting Redis Tests...") # Test connection client = await test_redis_connection() if not client: return False # Run all tests await test_basic_operations(client) await test_list_operations(client) await test_hash_operations(client) await test_ttl_operations(client) # Cleanup and disconnect await client.disconnect() logger.info("πŸŽ‰ All Redis tests passed!") return True except Exception as e: logger.error(f"❌ Redis tests failed: {e}") return False if __name__ == "__main__": success = asyncio.run(main()) sys.exit(0 if success else 1)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ilvolodel/iris-legacy'

If you have feedback or need assistance with the MCP directory API, please join our Discord server