performance_cache.py•14.3 kB
"""Performance Caching Layer for Calibre Library API
Provides intelligent caching for frequent queries and operations
to improve response times for large libraries (107k+ books).
"""
import time
import json
import hashlib
import logging
from typing import Any, Dict, Optional, List, Tuple
from collections import OrderedDict
from dataclasses import dataclass, asdict
from threading import RLock
from pathlib import Path
logger = logging.getLogger(__name__)
@dataclass
class CacheEntry:
"""Cache entry with metadata."""
data: Any
created_at: float
access_count: int
last_accessed: float
ttl_seconds: Optional[int] = None
def is_expired(self) -> bool:
"""Check if cache entry has expired."""
if self.ttl_seconds is None:
return False
return (time.time() - self.created_at) > self.ttl_seconds
def touch(self) -> None:
"""Update access statistics."""
self.access_count += 1
self.last_accessed = time.time()
class LRUCache:
"""Thread-safe LRU cache with TTL support."""
def __init__(self, max_size: int = 1000, default_ttl: int = 300):
"""Initialize LRU cache.
Args:
max_size: Maximum number of entries
default_ttl: Default time-to-live in seconds
"""
self.max_size = max_size
self.default_ttl = default_ttl
self._cache: OrderedDict[str, CacheEntry] = OrderedDict()
self._lock = RLock()
self._hits = 0
self._misses = 0
def get(self, key: str) -> Optional[Any]:
"""Get value from cache."""
with self._lock:
if key not in self._cache:
self._misses += 1
return None
entry = self._cache[key]
# Check expiration
if entry.is_expired():
del self._cache[key]
self._misses += 1
return None
# Move to end (most recently used)
self._cache.move_to_end(key)
entry.touch()
self._hits += 1
return entry.data
def put(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
"""Store value in cache."""
with self._lock:
if ttl is None:
ttl = self.default_ttl
# Remove oldest entries if at capacity
while len(self._cache) >= self.max_size:
self._cache.popitem(last=False)
entry = CacheEntry(
data=value,
created_at=time.time(),
access_count=0,
last_accessed=time.time(),
ttl_seconds=ttl
)
self._cache[key] = entry
self._cache.move_to_end(key)
def invalidate(self, key: str) -> bool:
"""Remove specific key from cache."""
with self._lock:
if key in self._cache:
del self._cache[key]
return True
return False
def clear(self) -> None:
"""Clear all cached entries."""
with self._lock:
self._cache.clear()
self._hits = 0
self._misses = 0
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
with self._lock:
total_requests = self._hits + self._misses
hit_rate = (self._hits / total_requests * 100) if total_requests > 0 else 0
return {
"hits": self._hits,
"misses": self._misses,
"total_requests": total_requests,
"hit_rate_percent": round(hit_rate, 2),
"current_size": len(self._cache),
"max_size": self.max_size
}
class QueryCache:
"""Specialized cache for query results with intelligent invalidation."""
def __init__(self, max_size: int = 500):
"""Initialize query cache."""
self.cache = LRUCache(max_size, default_ttl=600) # 10 minute TTL
self._query_patterns: Dict[str, List[str]] = {} # Pattern -> keys mapping
def _extract_query_patterns(self, query_dict: Dict[str, Any]) -> List[str]:
"""Extract cacheable patterns from query."""
patterns = []
# Author-based patterns
if query_dict.get("metadata_filters", {}).get("authors"):
for author in query_dict["metadata_filters"]["authors"]:
patterns.append(f"author:{author.lower()}")
# Series-based patterns
if query_dict.get("metadata_filters", {}).get("series"):
for series in query_dict["metadata_filters"]["series"]:
patterns.append(f"series:{series.lower()}")
# Text search patterns
if query_dict.get("text_search", {}).get("query"):
query_text = query_dict["text_search"]["query"].lower()
patterns.append(f"text:{query_text}")
return patterns
def get_query_result(self, query_key: str, query_dict: Dict[str, Any]) -> Optional[Any]:
"""Get cached query result."""
result = self.cache.get(query_key)
if result is not None:
# Track query patterns for invalidation
patterns = self._extract_query_patterns(query_dict)
for pattern in patterns:
if pattern not in self._query_patterns:
self._query_patterns[pattern] = []
if query_key not in self._query_patterns[pattern]:
self._query_patterns[pattern].append(query_key)
return result
def cache_query_result(self, query_key: str, query_dict: Dict[str, Any],
result: Any, ttl: Optional[int] = None) -> None:
"""Cache query result with pattern tracking."""
self.cache.put(query_key, result, ttl)
# Track patterns for this query
patterns = self._extract_query_patterns(query_dict)
for pattern in patterns:
if pattern not in self._query_patterns:
self._query_patterns[pattern] = []
if query_key not in self._query_patterns[pattern]:
self._query_patterns[pattern].append(query_key)
def invalidate_by_pattern(self, pattern: str) -> int:
"""Invalidate all cached queries matching a pattern."""
if pattern not in self._query_patterns:
return 0
invalidated = 0
for query_key in self._query_patterns[pattern]:
if self.cache.invalidate(query_key):
invalidated += 1
del self._query_patterns[pattern]
return invalidated
class MetadataCache:
"""Cache for frequently accessed metadata."""
def __init__(self, max_size: int = 10000):
"""Initialize metadata cache."""
self.book_cache = LRUCache(max_size, default_ttl=3600) # 1 hour TTL
self.author_cache = LRUCache(max_size // 10, default_ttl=1800) # 30 min TTL
self.series_cache = LRUCache(max_size // 10, default_ttl=1800) # 30 min TTL
def get_book(self, book_id: int) -> Optional[Dict[str, Any]]:
"""Get cached book metadata."""
return self.book_cache.get(f"book:{book_id}")
def cache_book(self, book_id: int, metadata: Dict[str, Any]) -> None:
"""Cache book metadata."""
self.book_cache.put(f"book:{book_id}", metadata)
def get_author_books(self, author: str) -> Optional[List[int]]:
"""Get cached list of book IDs for an author."""
return self.author_cache.get(f"author:{author.lower()}")
def cache_author_books(self, author: str, book_ids: List[int]) -> None:
"""Cache author's book list."""
self.author_cache.put(f"author:{author.lower()}", book_ids)
def get_series_books(self, series: str) -> Optional[List[int]]:
"""Get cached list of book IDs for a series."""
return self.series_cache.get(f"series:{series.lower()}")
def cache_series_books(self, series: str, book_ids: List[int]) -> None:
"""Cache series book list."""
self.series_cache.put(f"series:{series.lower()}", book_ids)
class PerformanceMonitor:
"""Monitor and log performance metrics."""
def __init__(self):
"""Initialize performance monitor."""
self.query_times: List[float] = []
self.slow_queries: List[Tuple[str, float, Dict[str, Any]]] = []
self.slow_query_threshold = 1000.0 # 1 second in milliseconds
def record_query_time(self, query_id: str, execution_time_ms: float,
query_dict: Dict[str, Any]) -> None:
"""Record query execution time."""
self.query_times.append(execution_time_ms)
# Keep only recent times (last 1000 queries)
if len(self.query_times) > 1000:
self.query_times = self.query_times[-1000:]
# Track slow queries
if execution_time_ms > self.slow_query_threshold:
self.slow_queries.append((query_id, execution_time_ms, query_dict))
logger.warning(f"Slow query {query_id}: {execution_time_ms:.2f}ms")
# Keep only recent slow queries
if len(self.slow_queries) > 100:
self.slow_queries = self.slow_queries[-100:]
def get_performance_stats(self) -> Dict[str, Any]:
"""Get performance statistics."""
if not self.query_times:
return {"status": "no_data"}
import statistics
return {
"total_queries": len(self.query_times),
"avg_query_time_ms": round(statistics.mean(self.query_times), 2),
"median_query_time_ms": round(statistics.median(self.query_times), 2),
"p95_query_time_ms": round(statistics.quantiles(self.query_times, n=20)[18], 2),
"slow_query_count": len(self.slow_queries),
"slow_query_threshold_ms": self.slow_query_threshold
}
class CalibrePerformanceCache:
"""Main performance caching system for Calibre Library API."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""Initialize performance cache system."""
if config is None:
config = {}
self.query_cache = QueryCache(config.get("query_cache_size", 500))
self.metadata_cache = MetadataCache(config.get("metadata_cache_size", 10000))
self.monitor = PerformanceMonitor()
# Content cache for frequently accessed book content
self.content_cache = LRUCache(
config.get("content_cache_size", 100),
default_ttl=config.get("content_ttl", 1800) # 30 minutes
)
# Statistics cache for dashboard data
self.stats_cache = LRUCache(10, default_ttl=300) # 5 minutes
logger.info("Performance cache system initialized")
def get_or_execute_query(self, query_key: str, query_dict: Dict[str, Any],
executor_func, *args, **kwargs) -> Any:
"""Get cached result or execute query with caching."""
# Check cache first
cached_result = self.query_cache.get_query_result(query_key, query_dict)
if cached_result is not None:
logger.debug(f"Cache hit for query {query_key[:8]}...")
return cached_result
# Execute query and measure time
start_time = time.time()
result = executor_func(*args, **kwargs)
execution_time_ms = (time.time() - start_time) * 1000
# Record performance metrics
self.monitor.record_query_time(query_key, execution_time_ms, query_dict)
# Cache successful results
if hasattr(result, 'status') and result.status.value == "success":
# Determine TTL based on query complexity
complexity = query_dict.get("complexity", "moderate")
ttl = {
"simple": 1800, # 30 minutes
"moderate": 900, # 15 minutes
"complex": 300 # 5 minutes
}.get(complexity, 600)
self.query_cache.cache_query_result(query_key, query_dict, result, ttl)
return result
def get_book_metadata(self, book_id: int) -> Optional[Dict[str, Any]]:
"""Get cached book metadata."""
return self.metadata_cache.get_book(book_id)
def cache_book_metadata(self, book_id: int, metadata: Dict[str, Any]) -> None:
"""Cache book metadata."""
self.metadata_cache.cache_book(book_id, metadata)
def get_content(self, content_key: str) -> Optional[str]:
"""Get cached book content."""
return self.content_cache.get(content_key)
def cache_content(self, content_key: str, content: str, ttl: Optional[int] = None) -> None:
"""Cache book content."""
self.content_cache.put(content_key, content, ttl)
def invalidate_book(self, book_id: int) -> None:
"""Invalidate all cached data for a book."""
self.metadata_cache.book_cache.invalidate(f"book:{book_id}")
# Could also invalidate related query patterns
def get_cache_stats(self) -> Dict[str, Any]:
"""Get comprehensive cache statistics."""
return {
"query_cache": self.query_cache.cache.get_stats(),
"metadata_cache": {
"books": self.metadata_cache.book_cache.get_stats(),
"authors": self.metadata_cache.author_cache.get_stats(),
"series": self.metadata_cache.series_cache.get_stats()
},
"content_cache": self.content_cache.get_stats(),
"performance": self.monitor.get_performance_stats()
}
def warm_cache(self, popular_queries: List[Dict[str, Any]]) -> None:
"""Pre-populate cache with popular queries."""
logger.info(f"Warming cache with {len(popular_queries)} popular queries")
# This would be implemented based on usage patterns
pass