Skip to main content
Glama

Path of Exile 2 Build Optimizer MCP

cache_manager.py10.4 kB
""" Multi-tier caching system Supports in-memory, Redis, and SQLite caching """ import asyncio import json import logging import pickle import sqlite3 from typing import Any, Optional, Dict from datetime import datetime, timedelta from pathlib import Path import aiosqlite try: from ..config import settings, CACHE_DIR except ImportError: from src.config import settings, CACHE_DIR logger = logging.getLogger(__name__) class CacheManager: """ Multi-tier cache manager L1: In-memory (fastest, limited size) L2: Redis (optional, shared across instances) L3: SQLite (persistent, slower) """ def __init__(self, enable_redis: bool = False): self.enable_redis = enable_redis and settings.REDIS_ENABLED # L1: In-memory cache self.memory_cache: Dict[str, tuple[Any, datetime]] = {} self.max_memory_items = 1000 # L2: Redis client (optional) self.redis_client = None # L3: SQLite cache self.sqlite_path = CACHE_DIR / "cache.db" self.sqlite_conn: Optional[aiosqlite.Connection] = None async def initialize(self): """Initialize cache connections""" try: # Initialize Redis if enabled if self.enable_redis: try: import aioredis self.redis_client = await aioredis.from_url( settings.REDIS_URL, encoding="utf-8", decode_responses=True ) logger.info("Redis cache initialized") except Exception as e: logger.warning(f"Failed to connect to Redis: {e}") self.enable_redis = False # Initialize SQLite cache self.sqlite_conn = await aiosqlite.connect(str(self.sqlite_path)) await self._init_sqlite_schema() logger.info("SQLite cache initialized") except Exception as e: logger.error(f"Cache initialization failed: {e}") async def _init_sqlite_schema(self): """Initialize SQLite cache schema""" await self.sqlite_conn.execute(""" CREATE TABLE IF NOT EXISTS cache ( key TEXT PRIMARY KEY, value BLOB, expires_at TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) await self.sqlite_conn.execute(""" CREATE INDEX IF NOT EXISTS idx_expires_at ON cache(expires_at) """) await self.sqlite_conn.commit() async def get(self, key: str) -> Optional[Any]: """ Get value from cache (checks all tiers) Args: key: Cache key Returns: Cached value or None if not found/expired """ # Try L1: Memory cache if key in self.memory_cache: value, expires_at = self.memory_cache[key] if datetime.now() < expires_at: logger.debug(f"L1 cache hit: {key}") return value else: # Expired, remove from memory del self.memory_cache[key] # Try L2: Redis cache if self.redis_client: try: value = await self.redis_client.get(key) if value: logger.debug(f"L2 cache hit: {key}") # Deserialize deserialized = json.loads(value) # Store in L1 for next time self._set_memory_cache(key, deserialized, 300) # 5 min in L1 return deserialized except Exception as e: logger.warning(f"Redis get error: {e}") # Try L3: SQLite cache if self.sqlite_conn: try: async with self.sqlite_conn.execute( "SELECT value, expires_at FROM cache WHERE key = ?", (key,) ) as cursor: row = await cursor.fetchone() if row: value_blob, expires_at_str = row expires_at = datetime.fromisoformat(expires_at_str) if datetime.now() < expires_at: logger.debug(f"L3 cache hit: {key}") # Deserialize value = pickle.loads(value_blob) # Store in higher tiers for next time self._set_memory_cache(key, value, 300) return value else: # Expired, delete from SQLite await self.sqlite_conn.execute( "DELETE FROM cache WHERE key = ?", (key,) ) await self.sqlite_conn.commit() except Exception as e: logger.warning(f"SQLite get error: {e}") return None async def set(self, key: str, value: Any, ttl: int = 3600): """ Set value in cache (all tiers) Args: key: Cache key value: Value to cache ttl: Time to live in seconds """ expires_at = datetime.now() + timedelta(seconds=ttl) # Set in L1: Memory cache self._set_memory_cache(key, value, ttl) # Set in L2: Redis cache if self.redis_client: try: serialized = json.dumps(value) await self.redis_client.setex(key, ttl, serialized) except Exception as e: logger.warning(f"Redis set error: {e}") # Set in L3: SQLite cache if self.sqlite_conn: try: value_blob = pickle.dumps(value) await self.sqlite_conn.execute( "INSERT OR REPLACE INTO cache (key, value, expires_at) VALUES (?, ?, ?)", (key, value_blob, expires_at.isoformat()) ) await self.sqlite_conn.commit() except Exception as e: logger.warning(f"SQLite set error: {e}") def _set_memory_cache(self, key: str, value: Any, ttl: int): """Set value in memory cache with size limit""" # If cache is full, remove oldest items if len(self.memory_cache) >= self.max_memory_items: # Remove expired items first now = datetime.now() expired_keys = [ k for k, (_, exp) in self.memory_cache.items() if exp < now ] for k in expired_keys: del self.memory_cache[k] # If still full, remove oldest 10% if len(self.memory_cache) >= self.max_memory_items: to_remove = int(self.max_memory_items * 0.1) for k in list(self.memory_cache.keys())[:to_remove]: del self.memory_cache[k] expires_at = datetime.now() + timedelta(seconds=ttl) self.memory_cache[key] = (value, expires_at) async def delete(self, key: str): """Delete key from all cache tiers""" # Delete from L1 if key in self.memory_cache: del self.memory_cache[key] # Delete from L2 if self.redis_client: try: await self.redis_client.delete(key) except Exception as e: logger.warning(f"Redis delete error: {e}") # Delete from L3 if self.sqlite_conn: try: await self.sqlite_conn.execute( "DELETE FROM cache WHERE key = ?", (key,) ) await self.sqlite_conn.commit() except Exception as e: logger.warning(f"SQLite delete error: {e}") async def clear(self): """Clear all caches""" # Clear L1 self.memory_cache.clear() # Clear L2 if self.redis_client: try: await self.redis_client.flushdb() except Exception as e: logger.warning(f"Redis clear error: {e}") # Clear L3 if self.sqlite_conn: try: await self.sqlite_conn.execute("DELETE FROM cache") await self.sqlite_conn.commit() except Exception as e: logger.warning(f"SQLite clear error: {e}") async def cleanup_expired(self): """Remove expired entries from all caches""" # L1 cleanup now = datetime.now() expired_keys = [ k for k, (_, exp) in self.memory_cache.items() if exp < now ] for k in expired_keys: del self.memory_cache[k] # L3 cleanup (Redis handles expiry automatically) if self.sqlite_conn: try: await self.sqlite_conn.execute( "DELETE FROM cache WHERE expires_at < ?", (now.isoformat(),) ) await self.sqlite_conn.commit() except Exception as e: logger.warning(f"SQLite cleanup error: {e}") async def get_statistics(self) -> Dict[str, Any]: """Get cache statistics""" stats = { "l1_memory_items": len(self.memory_cache), "l1_max_items": self.max_memory_items } if self.sqlite_conn: try: async with self.sqlite_conn.execute( "SELECT COUNT(*) FROM cache" ) as cursor: row = await cursor.fetchone() stats["l3_sqlite_items"] = row[0] if row else 0 except Exception as e: logger.warning(f"SQLite stats error: {e}") return stats async def close(self): """Close cache connections""" if self.redis_client: try: await self.redis_client.close() except Exception as e: logger.warning(f"Redis close error: {e}") if self.sqlite_conn: try: await self.sqlite_conn.close() except Exception as e: logger.warning(f"SQLite close error: {e}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/HivemindOverlord/poe2-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server