#!/usr/bin/env python3
"""
Test Redis Connection and Basic Operations
"""
import asyncio
import sys
import os
from datetime import datetime
# Add project root to path
project_root = os.path.join(os.path.dirname(__file__), '..')
sys.path.insert(0, project_root)
from src.redis_client.client import RedisClient, get_redis_client
import logging
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def test_redis_connection():
"""Test basic Redis connection"""
logger.info("π Testing Redis connection...")
try:
client = RedisClient()
await client.connect()
# Health check
is_healthy = await client.health_check()
if is_healthy:
logger.info("β
Redis connection successful!")
return client
else:
logger.error("β Redis health check failed!")
return None
except Exception as e:
logger.error(f"β Redis connection failed: {e}")
return None
async def test_basic_operations(client: RedisClient):
"""Test basic Redis operations"""
logger.info("π§ͺ Testing basic Redis operations...")
try:
# Test SET/GET
test_key = "iris:test:basic"
test_value = {"message": "Hello IRIS!", "timestamp": datetime.now().isoformat()}
# Set value
success = await client.set(test_key, test_value, ttl=60)
assert success, "SET operation failed"
logger.info(f"β
SET: {test_key}")
# Get value
retrieved = await client.get(test_key)
assert retrieved == test_value, f"GET mismatch: {retrieved} != {test_value}"
logger.info(f"β
GET: {retrieved}")
# Test EXISTS
exists = await client.exists(test_key)
assert exists, "EXISTS check failed"
logger.info("β
EXISTS check passed")
# Test DELETE
deleted = await client.delete(test_key)
assert deleted == 1, "DELETE operation failed"
logger.info("β
DELETE operation successful")
# Verify deletion
retrieved_after_delete = await client.get(test_key)
assert retrieved_after_delete is None, "Key still exists after deletion"
logger.info("β
Key properly deleted")
except Exception as e:
logger.error(f"β Basic operations test failed: {e}")
raise
async def test_list_operations(client: RedisClient):
"""Test Redis list operations"""
logger.info("π Testing list operations...")
try:
list_key = "iris:test:messages"
# Test LPUSH
messages = [
{"role": "user", "content": "Hello!"},
{"role": "assistant", "content": "Hi there!"},
{"role": "user", "content": "How are you?"}
]
for msg in messages:
await client.lpush(list_key, msg)
logger.info(f"β
LPUSH: Added {len(messages)} messages")
# Test LLEN
length = await client.llen(list_key)
assert length == len(messages), f"List length mismatch: {length} != {len(messages)}"
logger.info(f"β
LLEN: {length}")
# Test LRANGE
retrieved_messages = await client.lrange(list_key)
assert len(retrieved_messages) == len(messages), "LRANGE length mismatch"
logger.info(f"β
LRANGE: Retrieved {len(retrieved_messages)} messages")
# Test LTRIM (keep last 2 messages)
await client.ltrim(list_key, 0, 1)
trimmed_length = await client.llen(list_key)
assert trimmed_length == 2, f"LTRIM failed: {trimmed_length} != 2"
logger.info("β
LTRIM: Trimmed to 2 messages")
# Cleanup
await client.delete(list_key)
logger.info("β
List operations cleanup complete")
except Exception as e:
logger.error(f"β List operations test failed: {e}")
raise
async def test_hash_operations(client: RedisClient):
"""Test Redis hash operations"""
logger.info("ποΈ Testing hash operations...")
try:
hash_key = "iris:test:user:123456789"
# Test HSET
user_data = {
"username": "test_user",
"first_name": "Test",
"preferences": {"theme": "dark", "notifications": True},
"last_activity": datetime.now().isoformat()
}
for field, value in user_data.items():
await client.hset(hash_key, field, value)
logger.info(f"β
HSET: Set {len(user_data)} fields")
# Test HGET
username = await client.hget(hash_key, "username")
assert username == "test_user", f"HGET mismatch: {username} != test_user"
logger.info(f"β
HGET: {username}")
# Test HGETALL
all_data = await client.hgetall(hash_key)
assert len(all_data) == len(user_data), "HGETALL length mismatch"
logger.info(f"β
HGETALL: Retrieved {len(all_data)} fields")
# Test HDEL
deleted_fields = await client.hdel(hash_key, "last_activity")
assert deleted_fields == 1, "HDEL failed"
logger.info("β
HDEL: Deleted field")
# Cleanup
await client.delete(hash_key)
logger.info("β
Hash operations cleanup complete")
except Exception as e:
logger.error(f"β Hash operations test failed: {e}")
raise
async def test_ttl_operations(client: RedisClient):
"""Test TTL operations"""
logger.info("β° Testing TTL operations...")
try:
ttl_key = "iris:test:ttl"
# Set with TTL
await client.set(ttl_key, "temporary_value", ttl=2)
logger.info("β
SET with TTL: 2 seconds")
# Check exists immediately
exists_before = await client.exists(ttl_key)
assert exists_before, "Key should exist immediately after SET"
logger.info("β
Key exists immediately after SET")
# Wait and check expiration
logger.info("β³ Waiting 3 seconds for TTL expiration...")
await asyncio.sleep(3)
exists_after = await client.exists(ttl_key)
assert not exists_after, "Key should have expired"
logger.info("β
Key expired after TTL")
except Exception as e:
logger.error(f"β TTL operations test failed: {e}")
raise
async def main():
"""Run all Redis tests"""
try:
logger.info("π Starting Redis Tests...")
# Test connection
client = await test_redis_connection()
if not client:
return False
# Run all tests
await test_basic_operations(client)
await test_list_operations(client)
await test_hash_operations(client)
await test_ttl_operations(client)
# Cleanup and disconnect
await client.disconnect()
logger.info("π All Redis tests passed!")
return True
except Exception as e:
logger.error(f"β Redis tests failed: {e}")
return False
if __name__ == "__main__":
success = asyncio.run(main())
sys.exit(0 if success else 1)