Skip to main content
Glama
client.py11.7 kB
""" IRIS Redis Client Core Redis connection and utilities """ import json import logging from typing import Any, Dict, List, Optional, Union from contextlib import asynccontextmanager import redis.asyncio as redis from redis.asyncio import ConnectionPool from ..config.settings import settings logger = logging.getLogger(__name__) class RedisClient: """Redis client for IRIS with async support""" def __init__(self): self._pool: Optional[ConnectionPool] = None self._client: Optional[redis.Redis] = None async def connect(self) -> None: """Initialize Redis connection pool""" try: self._pool = ConnectionPool( host=settings.redis_host, port=settings.redis_port, db=settings.redis_db, password=settings.redis_password, max_connections=settings.redis_max_connections, decode_responses=True, encoding='utf-8' ) self._client = redis.Redis(connection_pool=self._pool) # Test connection await self._client.ping() logger.info(f"✅ Redis connected: {settings.redis_host}:{settings.redis_port}/{settings.redis_db}") except Exception as e: logger.error(f"❌ Redis connection failed: {e}") raise async def disconnect(self) -> None: """Close Redis connection""" if self._client: await self._client.aclose() logger.info("Redis connection closed") @property def client(self) -> redis.Redis: """Get Redis client instance""" if not self._client: raise RuntimeError("Redis client not connected. Call connect() first.") return self._client async def health_check(self) -> bool: """Check Redis connectivity""" try: await self.client.ping() return True except Exception as e: logger.error(f"Redis health check failed: {e}") return False # Key-Value Operations async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool: """Set a key-value pair with optional TTL""" try: serialized_value = json.dumps(value) if not isinstance(value, str) else value result = await self.client.set(key, serialized_value, ex=ttl) return bool(result) except Exception as e: logger.error(f"Redis SET failed for key {key}: {e}") return False async def get(self, key: str, default: Any = None) -> Any: """Get value by key""" try: value = await self.client.get(key) if value is None: return default # Try to deserialize JSON, fallback to string try: return json.loads(value) except (json.JSONDecodeError, TypeError): return value except Exception as e: logger.error(f"Redis GET failed for key {key}: {e}") return default async def delete(self, *keys: str) -> int: """Delete one or more keys""" try: return await self.client.delete(*keys) except Exception as e: logger.error(f"Redis DELETE failed for keys {keys}: {e}") return 0 async def exists(self, key: str) -> bool: """Check if key exists""" try: return bool(await self.client.exists(key)) except Exception as e: logger.error(f"Redis EXISTS failed for key {key}: {e}") return False async def expire(self, key: str, ttl: int) -> bool: """Set TTL for existing key""" try: return bool(await self.client.expire(key, ttl)) except Exception as e: logger.error(f"Redis EXPIRE failed for key {key}: {e}") return False # List Operations async def lpush(self, key: str, *values: Any) -> int: """Push values to the left of a list""" try: serialized_values = [ json.dumps(v) if not isinstance(v, str) else v for v in values ] return await self.client.lpush(key, *serialized_values) except Exception as e: logger.error(f"Redis LPUSH failed for key {key}: {e}") return 0 async def rpush(self, key: str, *values: Any) -> int: """Push values to the right of a list""" try: serialized_values = [ json.dumps(v) if not isinstance(v, str) else v for v in values ] return await self.client.rpush(key, *serialized_values) except Exception as e: logger.error(f"Redis RPUSH failed for key {key}: {e}") return 0 async def lrange(self, key: str, start: int = 0, end: int = -1) -> List[Any]: """Get range of list elements""" try: values = await self.client.lrange(key, start, end) result = [] for value in values: try: result.append(json.loads(value)) except (json.JSONDecodeError, TypeError): result.append(value) return result except Exception as e: logger.error(f"Redis LRANGE failed for key {key}: {e}") return [] async def llen(self, key: str) -> int: """Get list length""" try: return await self.client.llen(key) except Exception as e: logger.error(f"Redis LLEN failed for key {key}: {e}") return 0 async def ltrim(self, key: str, start: int, end: int) -> bool: """Trim list to specified range""" try: result = await self.client.ltrim(key, start, end) return result == b'OK' or result == 'OK' except Exception as e: logger.error(f"Redis LTRIM failed for key {key}: {e}") return False # Hash Operations async def hset(self, key: str, field: str, value: Any) -> bool: """Set hash field""" try: serialized_value = json.dumps(value) if not isinstance(value, str) else value result = await self.client.hset(key, field, serialized_value) return bool(result) except Exception as e: logger.error(f"Redis HSET failed for key {key}, field {field}: {e}") return False async def hmset(self, key: str, mapping: Dict[str, Any]) -> bool: """Set multiple hash fields""" try: serialized_mapping = {} for field, value in mapping.items(): serialized_mapping[field] = json.dumps(value) if not isinstance(value, str) else value result = await self.client.hset(key, mapping=serialized_mapping) return bool(result) except Exception as e: logger.error(f"Redis HMSET failed for key {key}: {e}") return False async def hget(self, key: str, field: str, default: Any = None) -> Any: """Get hash field value""" try: value = await self.client.hget(key, field) if value is None: return default try: return json.loads(value) except (json.JSONDecodeError, TypeError): return value except Exception as e: logger.error(f"Redis HGET failed for key {key}, field {field}: {e}") return default async def hgetall(self, key: str) -> Dict[str, Any]: """Get all hash fields and values""" try: hash_data = await self.client.hgetall(key) result = {} for field, value in hash_data.items(): try: result[field] = json.loads(value) except (json.JSONDecodeError, TypeError): result[field] = value return result except Exception as e: logger.error(f"Redis HGETALL failed for key {key}: {e}") return {} async def hdel(self, key: str, *fields: str) -> int: """Delete hash fields""" try: return await self.client.hdel(key, *fields) except Exception as e: logger.error(f"Redis HDEL failed for key {key}, fields {fields}: {e}") return 0 async def hincrby(self, key: str, field: str, amount: int = 1) -> int: """Increment hash field by amount""" try: return await self.client.hincrby(key, field, amount) except Exception as e: logger.error(f"Redis HINCRBY failed for key {key}, field {field}: {e}") return 0 # Set Operations async def sadd(self, key: str, *values: Any) -> int: """Add values to set""" try: serialized_values = [ json.dumps(v) if not isinstance(v, str) else v for v in values ] return await self.client.sadd(key, *serialized_values) except Exception as e: logger.error(f"Redis SADD failed for key {key}: {e}") return 0 async def srem(self, key: str, *values: Any) -> int: """Remove values from set""" try: serialized_values = [ json.dumps(v) if not isinstance(v, str) else v for v in values ] return await self.client.srem(key, *serialized_values) except Exception as e: logger.error(f"Redis SREM failed for key {key}: {e}") return 0 async def smembers(self, key: str) -> List[Any]: """Get all set members""" try: values = await self.client.smembers(key) result = [] for value in values: try: result.append(json.loads(value)) except (json.JSONDecodeError, TypeError): result.append(value) return result except Exception as e: logger.error(f"Redis SMEMBERS failed for key {key}: {e}") return [] # Utility Methods async def keys(self, pattern: str = "*") -> List[str]: """Get keys matching pattern""" try: return await self.client.keys(pattern) except Exception as e: logger.error(f"Redis KEYS failed for pattern {pattern}: {e}") return [] async def flushdb(self) -> bool: """Clear current database (use with caution!)""" try: result = await self.client.flushdb() return result == b'OK' or result == 'OK' except Exception as e: logger.error(f"Redis FLUSHDB failed: {e}") return False # Global Redis client instance _redis_client: Optional[RedisClient] = None async def get_redis_client() -> RedisClient: """Get or create Redis client instance""" global _redis_client if _redis_client is None: _redis_client = RedisClient() await _redis_client.connect() return _redis_client @asynccontextmanager async def redis_context(): """Context manager for Redis operations""" client = await get_redis_client() try: yield client except Exception as e: logger.error(f"Redis context error: {e}") raise finally: # Connection is managed globally, don't close here pass

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ilvolodel/iris-legacy'

If you have feedback or need assistance with the MCP directory API, please join our Discord server