Skip to main content
Glama
connection.py4.25 kB
"""Redis connection and pooling management. This module provides the RedisCache class for managing Redis connections with connection pooling and graceful error handling. """ import os from typing import Optional import redis.asyncio as redis from redis.asyncio.connection import ConnectionPool import structlog logger = structlog.get_logger(__name__) class RedisCache: """ Redis cache connection manager with connection pooling. Provides connection pool management, health checks, and graceful error handling for Redis operations. Attributes: pool: Redis connection pool client: Redis client instance """ def __init__(self) -> None: """Initialize Redis cache with connection pooling.""" self.pool: Optional[ConnectionPool] = None self.client: Optional[redis.Redis] = None self._initialize_pool() def _initialize_pool(self) -> None: """ Initialize Redis connection pool. Configures connection pool from REDIS_URL environment variable with optimal settings for the MCP server. """ redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0") try: self.pool = ConnectionPool.from_url( redis_url, max_connections=20, # Support high concurrency decode_responses=True, # Auto-decode bytes to str socket_timeout=5, # 5 second socket timeout socket_connect_timeout=5, # 5 second connect timeout retry_on_timeout=True, # Retry on timeout ) self.client = redis.Redis(connection_pool=self.pool) logger.info( "redis_pool_initialized", max_connections=20, redis_url=redis_url.split("@")[-1], # Don't log credentials ) except Exception as e: logger.error( "redis_pool_initialization_failed", error=str(e), error_type=type(e).__name__, ) # Don't raise - fail open (cache unavailable but server continues) self.client = None self.pool = None async def ping(self) -> bool: """ Check Redis connection health. Returns: True if Redis is healthy, False otherwise Example: >>> cache = RedisCache() >>> is_healthy = await cache.ping() >>> print(f"Redis healthy: {is_healthy}") """ if not self.client: logger.warning("redis_ping_failed", reason="client_not_initialized") return False try: result = await self.client.ping() logger.debug("redis_ping_success", result=result) return result except Exception as e: logger.error( "redis_ping_failed", error=str(e), error_type=type(e).__name__, ) return False async def close(self) -> None: """ Close Redis connection pool gracefully. Should be called during application shutdown to ensure all connections are properly closed. Example: >>> cache = RedisCache() >>> # ... use cache ... >>> await cache.close() """ try: if self.client: await self.client.close() logger.info("redis_client_closed") if self.pool: await self.pool.disconnect() logger.info("redis_pool_disconnected") except Exception as e: logger.error( "redis_close_error", error=str(e), error_type=type(e).__name__, ) def is_available(self) -> bool: """ Check if Redis client is available. Returns: True if client is initialized, False otherwise Note: This only checks if the client exists, not if Redis is reachable. Use ping() for a real health check. """ return self.client is not None # Global Redis cache instance (singleton pattern) cache = RedisCache()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/padak/apify-actor-reddit-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server