Skip to main content
Glama

Neolibrarian MCP

by pshap
performance_cache.py14.3 kB
"""Performance Caching Layer for Calibre Library API Provides intelligent caching for frequent queries and operations to improve response times for large libraries (107k+ books). """ import time import json import hashlib import logging from typing import Any, Dict, Optional, List, Tuple from collections import OrderedDict from dataclasses import dataclass, asdict from threading import RLock from pathlib import Path logger = logging.getLogger(__name__) @dataclass class CacheEntry: """Cache entry with metadata.""" data: Any created_at: float access_count: int last_accessed: float ttl_seconds: Optional[int] = None def is_expired(self) -> bool: """Check if cache entry has expired.""" if self.ttl_seconds is None: return False return (time.time() - self.created_at) > self.ttl_seconds def touch(self) -> None: """Update access statistics.""" self.access_count += 1 self.last_accessed = time.time() class LRUCache: """Thread-safe LRU cache with TTL support.""" def __init__(self, max_size: int = 1000, default_ttl: int = 300): """Initialize LRU cache. Args: max_size: Maximum number of entries default_ttl: Default time-to-live in seconds """ self.max_size = max_size self.default_ttl = default_ttl self._cache: OrderedDict[str, CacheEntry] = OrderedDict() self._lock = RLock() self._hits = 0 self._misses = 0 def get(self, key: str) -> Optional[Any]: """Get value from cache.""" with self._lock: if key not in self._cache: self._misses += 1 return None entry = self._cache[key] # Check expiration if entry.is_expired(): del self._cache[key] self._misses += 1 return None # Move to end (most recently used) self._cache.move_to_end(key) entry.touch() self._hits += 1 return entry.data def put(self, key: str, value: Any, ttl: Optional[int] = None) -> None: """Store value in cache.""" with self._lock: if ttl is None: ttl = self.default_ttl # Remove oldest entries if at capacity while len(self._cache) >= self.max_size: self._cache.popitem(last=False) entry = CacheEntry( data=value, created_at=time.time(), access_count=0, last_accessed=time.time(), ttl_seconds=ttl ) self._cache[key] = entry self._cache.move_to_end(key) def invalidate(self, key: str) -> bool: """Remove specific key from cache.""" with self._lock: if key in self._cache: del self._cache[key] return True return False def clear(self) -> None: """Clear all cached entries.""" with self._lock: self._cache.clear() self._hits = 0 self._misses = 0 def get_stats(self) -> Dict[str, Any]: """Get cache statistics.""" with self._lock: total_requests = self._hits + self._misses hit_rate = (self._hits / total_requests * 100) if total_requests > 0 else 0 return { "hits": self._hits, "misses": self._misses, "total_requests": total_requests, "hit_rate_percent": round(hit_rate, 2), "current_size": len(self._cache), "max_size": self.max_size } class QueryCache: """Specialized cache for query results with intelligent invalidation.""" def __init__(self, max_size: int = 500): """Initialize query cache.""" self.cache = LRUCache(max_size, default_ttl=600) # 10 minute TTL self._query_patterns: Dict[str, List[str]] = {} # Pattern -> keys mapping def _extract_query_patterns(self, query_dict: Dict[str, Any]) -> List[str]: """Extract cacheable patterns from query.""" patterns = [] # Author-based patterns if query_dict.get("metadata_filters", {}).get("authors"): for author in query_dict["metadata_filters"]["authors"]: patterns.append(f"author:{author.lower()}") # Series-based patterns if query_dict.get("metadata_filters", {}).get("series"): for series in query_dict["metadata_filters"]["series"]: patterns.append(f"series:{series.lower()}") # Text search patterns if query_dict.get("text_search", {}).get("query"): query_text = query_dict["text_search"]["query"].lower() patterns.append(f"text:{query_text}") return patterns def get_query_result(self, query_key: str, query_dict: Dict[str, Any]) -> Optional[Any]: """Get cached query result.""" result = self.cache.get(query_key) if result is not None: # Track query patterns for invalidation patterns = self._extract_query_patterns(query_dict) for pattern in patterns: if pattern not in self._query_patterns: self._query_patterns[pattern] = [] if query_key not in self._query_patterns[pattern]: self._query_patterns[pattern].append(query_key) return result def cache_query_result(self, query_key: str, query_dict: Dict[str, Any], result: Any, ttl: Optional[int] = None) -> None: """Cache query result with pattern tracking.""" self.cache.put(query_key, result, ttl) # Track patterns for this query patterns = self._extract_query_patterns(query_dict) for pattern in patterns: if pattern not in self._query_patterns: self._query_patterns[pattern] = [] if query_key not in self._query_patterns[pattern]: self._query_patterns[pattern].append(query_key) def invalidate_by_pattern(self, pattern: str) -> int: """Invalidate all cached queries matching a pattern.""" if pattern not in self._query_patterns: return 0 invalidated = 0 for query_key in self._query_patterns[pattern]: if self.cache.invalidate(query_key): invalidated += 1 del self._query_patterns[pattern] return invalidated class MetadataCache: """Cache for frequently accessed metadata.""" def __init__(self, max_size: int = 10000): """Initialize metadata cache.""" self.book_cache = LRUCache(max_size, default_ttl=3600) # 1 hour TTL self.author_cache = LRUCache(max_size // 10, default_ttl=1800) # 30 min TTL self.series_cache = LRUCache(max_size // 10, default_ttl=1800) # 30 min TTL def get_book(self, book_id: int) -> Optional[Dict[str, Any]]: """Get cached book metadata.""" return self.book_cache.get(f"book:{book_id}") def cache_book(self, book_id: int, metadata: Dict[str, Any]) -> None: """Cache book metadata.""" self.book_cache.put(f"book:{book_id}", metadata) def get_author_books(self, author: str) -> Optional[List[int]]: """Get cached list of book IDs for an author.""" return self.author_cache.get(f"author:{author.lower()}") def cache_author_books(self, author: str, book_ids: List[int]) -> None: """Cache author's book list.""" self.author_cache.put(f"author:{author.lower()}", book_ids) def get_series_books(self, series: str) -> Optional[List[int]]: """Get cached list of book IDs for a series.""" return self.series_cache.get(f"series:{series.lower()}") def cache_series_books(self, series: str, book_ids: List[int]) -> None: """Cache series book list.""" self.series_cache.put(f"series:{series.lower()}", book_ids) class PerformanceMonitor: """Monitor and log performance metrics.""" def __init__(self): """Initialize performance monitor.""" self.query_times: List[float] = [] self.slow_queries: List[Tuple[str, float, Dict[str, Any]]] = [] self.slow_query_threshold = 1000.0 # 1 second in milliseconds def record_query_time(self, query_id: str, execution_time_ms: float, query_dict: Dict[str, Any]) -> None: """Record query execution time.""" self.query_times.append(execution_time_ms) # Keep only recent times (last 1000 queries) if len(self.query_times) > 1000: self.query_times = self.query_times[-1000:] # Track slow queries if execution_time_ms > self.slow_query_threshold: self.slow_queries.append((query_id, execution_time_ms, query_dict)) logger.warning(f"Slow query {query_id}: {execution_time_ms:.2f}ms") # Keep only recent slow queries if len(self.slow_queries) > 100: self.slow_queries = self.slow_queries[-100:] def get_performance_stats(self) -> Dict[str, Any]: """Get performance statistics.""" if not self.query_times: return {"status": "no_data"} import statistics return { "total_queries": len(self.query_times), "avg_query_time_ms": round(statistics.mean(self.query_times), 2), "median_query_time_ms": round(statistics.median(self.query_times), 2), "p95_query_time_ms": round(statistics.quantiles(self.query_times, n=20)[18], 2), "slow_query_count": len(self.slow_queries), "slow_query_threshold_ms": self.slow_query_threshold } class CalibrePerformanceCache: """Main performance caching system for Calibre Library API.""" def __init__(self, config: Optional[Dict[str, Any]] = None): """Initialize performance cache system.""" if config is None: config = {} self.query_cache = QueryCache(config.get("query_cache_size", 500)) self.metadata_cache = MetadataCache(config.get("metadata_cache_size", 10000)) self.monitor = PerformanceMonitor() # Content cache for frequently accessed book content self.content_cache = LRUCache( config.get("content_cache_size", 100), default_ttl=config.get("content_ttl", 1800) # 30 minutes ) # Statistics cache for dashboard data self.stats_cache = LRUCache(10, default_ttl=300) # 5 minutes logger.info("Performance cache system initialized") def get_or_execute_query(self, query_key: str, query_dict: Dict[str, Any], executor_func, *args, **kwargs) -> Any: """Get cached result or execute query with caching.""" # Check cache first cached_result = self.query_cache.get_query_result(query_key, query_dict) if cached_result is not None: logger.debug(f"Cache hit for query {query_key[:8]}...") return cached_result # Execute query and measure time start_time = time.time() result = executor_func(*args, **kwargs) execution_time_ms = (time.time() - start_time) * 1000 # Record performance metrics self.monitor.record_query_time(query_key, execution_time_ms, query_dict) # Cache successful results if hasattr(result, 'status') and result.status.value == "success": # Determine TTL based on query complexity complexity = query_dict.get("complexity", "moderate") ttl = { "simple": 1800, # 30 minutes "moderate": 900, # 15 minutes "complex": 300 # 5 minutes }.get(complexity, 600) self.query_cache.cache_query_result(query_key, query_dict, result, ttl) return result def get_book_metadata(self, book_id: int) -> Optional[Dict[str, Any]]: """Get cached book metadata.""" return self.metadata_cache.get_book(book_id) def cache_book_metadata(self, book_id: int, metadata: Dict[str, Any]) -> None: """Cache book metadata.""" self.metadata_cache.cache_book(book_id, metadata) def get_content(self, content_key: str) -> Optional[str]: """Get cached book content.""" return self.content_cache.get(content_key) def cache_content(self, content_key: str, content: str, ttl: Optional[int] = None) -> None: """Cache book content.""" self.content_cache.put(content_key, content, ttl) def invalidate_book(self, book_id: int) -> None: """Invalidate all cached data for a book.""" self.metadata_cache.book_cache.invalidate(f"book:{book_id}") # Could also invalidate related query patterns def get_cache_stats(self) -> Dict[str, Any]: """Get comprehensive cache statistics.""" return { "query_cache": self.query_cache.cache.get_stats(), "metadata_cache": { "books": self.metadata_cache.book_cache.get_stats(), "authors": self.metadata_cache.author_cache.get_stats(), "series": self.metadata_cache.series_cache.get_stats() }, "content_cache": self.content_cache.get_stats(), "performance": self.monitor.get_performance_stats() } def warm_cache(self, popular_queries: List[Dict[str, Any]]) -> None: """Pre-populate cache with popular queries.""" logger.info(f"Warming cache with {len(popular_queries)} popular queries") # This would be implemented based on usage patterns pass

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/pshap/mcp-neolibrarian'

If you have feedback or need assistance with the MCP directory API, please join our Discord server