Skip to main content
Glama
cache.py6.9 kB
""" Redis caching middleware for Maximo API responses Implements cache-aside pattern with TTL management """ import json from typing import Any, Dict, Optional from functools import wraps import redis.asyncio as redis from src.config import settings, CACHE_CONFIG from src.utils.logger import get_logger logger = get_logger(__name__) class CacheManager: """Async Redis cache manager""" def __init__(self): self.enabled = settings.redis_enabled self._redis: Optional[redis.Redis] = None async def _get_redis(self) -> Optional[redis.Redis]: """Get or create Redis connection""" if not self.enabled: return None if self._redis is None: try: self._redis = await redis.Redis( host=settings.redis_host, port=settings.redis_port, db=settings.redis_db, password=settings.redis_password, decode_responses=True, socket_connect_timeout=5, socket_keepalive=True, ) # Test connection await self._redis.ping() logger.info("Redis connection established") except Exception as e: logger.error("Failed to connect to Redis", error=str(e)) self._redis = None self.enabled = False return self._redis async def close(self): """Close Redis connection""" if self._redis: await self._redis.close() logger.info("Redis connection closed") async def get(self, key: str) -> Optional[Any]: """Get value from cache""" if not self.enabled: return None try: redis_client = await self._get_redis() if not redis_client: return None value = await redis_client.get(key) if value: logger.debug("Cache hit", key=key) return json.loads(value) logger.debug("Cache miss", key=key) return None except Exception as e: logger.error("Cache get error", key=key, error=str(e)) return None async def set( self, key: str, value: Any, ttl: Optional[int] = None, ) -> bool: """Set value in cache with TTL""" if not self.enabled: return False try: redis_client = await self._get_redis() if not redis_client: return False serialized = json.dumps(value) if ttl: await redis_client.setex(key, ttl, serialized) else: await redis_client.set(key, serialized) logger.debug("Cache set", key=key, ttl=ttl) return True except Exception as e: logger.error("Cache set error", key=key, error=str(e)) return False async def delete(self, key: str) -> bool: """Delete key from cache""" if not self.enabled: return False try: redis_client = await self._get_redis() if not redis_client: return False await redis_client.delete(key) logger.debug("Cache delete", key=key) return True except Exception as e: logger.error("Cache delete error", key=key, error=str(e)) return False async def delete_pattern(self, pattern: str) -> int: """Delete all keys matching pattern""" if not self.enabled: return 0 try: redis_client = await self._get_redis() if not redis_client: return 0 keys = [] async for key in redis_client.scan_iter(match=pattern): keys.append(key) if keys: deleted = await redis_client.delete(*keys) logger.info("Cache pattern delete", pattern=pattern, count=deleted) return deleted return 0 except Exception as e: logger.error("Cache pattern delete error", pattern=pattern, error=str(e)) return 0 async def health_check(self) -> bool: """Check if Redis is accessible""" if not self.enabled: return True # No Redis, no problem try: redis_client = await self._get_redis() if not redis_client: return False await redis_client.ping() return True except Exception as e: logger.error("Redis health check failed", error=str(e)) return False # Global cache manager instance _cache_manager: Optional[CacheManager] = None def get_cache_manager() -> CacheManager: """Get global cache manager instance""" global _cache_manager if _cache_manager is None: _cache_manager = CacheManager() return _cache_manager async def close_cache_manager(): """Close global cache manager instance""" global _cache_manager if _cache_manager: await _cache_manager.close() _cache_manager = None def cached(cache_type: str): """ Decorator for caching function results Usage: @cached('asset_detail') async def get_asset(asset_num: str): return await maximo_client.get(f'/asset/{asset_num}') """ def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): # Build cache key from function name and arguments cache_key_parts = [func.__name__] # Add positional args for arg in args: if isinstance(arg, (str, int, float, bool)): cache_key_parts.append(str(arg)) # Add keyword args (sorted for consistency) for k, v in sorted(kwargs.items()): if isinstance(v, (str, int, float, bool)): cache_key_parts.append(f"{k}={v}") cache_key = ":".join(cache_key_parts) # Try to get from cache cache_manager = get_cache_manager() cached_value = await cache_manager.get(cache_key) if cached_value is not None: logger.debug("Returning cached result", function=func.__name__, cache_key=cache_key) return cached_value # Execute function result = await func(*args, **kwargs) # Cache the result ttl = CACHE_CONFIG.get(cache_type, {}).get("ttl", 300) await cache_manager.set(cache_key, result, ttl=ttl) return result return wrapper return decorator def build_cache_key(*parts: str) -> str: """Build cache key from parts""" return ":".join(str(p) for p in parts)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/nick0918964388/mcp-maximo-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server