Skip to main content
Glama
vitalune

Personal Knowledge Assistant

by vitalune
cache.py33 kB
""" Secure Caching Layer with Encryption and Audit Trail This module provides comprehensive caching capabilities with security features: - Encrypted cache storage with data integrity verification - Configurable data retention policies and automatic cleanup - Audit trails for all cache operations and data access - Support for multiple cache backends (memory, file, hybrid) - Cache invalidation mechanisms and consistency guarantees - Privacy-compliant data handling with anonymization options """ import asyncio import json import time import hashlib from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Dict, Any, Optional, Union, List, Tuple, TypeVar, Generic from dataclasses import dataclass, asdict from enum import Enum import threading from collections import OrderedDict import structlog from ..config.settings import get_settings from .encryption import get_encryption_manager, EncryptionManager logger = structlog.get_logger(__name__) T = TypeVar('T') class CacheBackend(str, Enum): """Supported cache backends""" MEMORY = "memory" FILE = "file" HYBRID = "hybrid" class CacheAccessType(str, Enum): """Types of cache access operations""" READ = "read" WRITE = "write" DELETE = "delete" INVALIDATE = "invalidate" CLEANUP = "cleanup" @dataclass class CacheEntry: """Cache entry with metadata""" key: str value: Any created_at: datetime expires_at: Optional[datetime] access_count: int last_accessed: datetime size_bytes: int tags: List[str] privacy_level: str checksum: str @dataclass class CacheAuditEntry: """Audit entry for cache operations""" timestamp: datetime operation: CacheAccessType cache_key: str user_context: Optional[str] ip_address: Optional[str] success: bool error_message: Optional[str] data_size: Optional[int] privacy_level: Optional[str] class LRUCache(Generic[T]): """Thread-safe LRU cache implementation""" def __init__(self, max_size: int): self.max_size = max_size self._cache: OrderedDict[str, T] = OrderedDict() self._lock = threading.RLock() def get(self, key: str) -> Optional[T]: """Get item from cache""" with self._lock: if key in self._cache: # Move to end (most recently used) value = self._cache.pop(key) self._cache[key] = value return value return None def put(self, key: str, value: T) -> None: """Put item in cache""" with self._lock: if key in self._cache: # Update existing item self._cache.pop(key) elif len(self._cache) >= self.max_size: # Remove least recently used item self._cache.popitem(last=False) self._cache[key] = value def remove(self, key: str) -> bool: """Remove item from cache""" with self._lock: if key in self._cache: del self._cache[key] return True return False def clear(self) -> None: """Clear all items from cache""" with self._lock: self._cache.clear() def size(self) -> int: """Get current cache size""" with self._lock: return len(self._cache) def keys(self) -> List[str]: """Get all cache keys""" with self._lock: return list(self._cache.keys()) class SecureCache: """Secure cache with encryption, audit trails, and privacy controls""" def __init__( self, backend: CacheBackend = CacheBackend.HYBRID, max_memory_entries: int = 1000, max_file_size_mb: int = 100, encryption_enabled: bool = True ): self.settings = get_settings() self.backend = backend self.encryption_enabled = encryption_enabled self.encryption_manager = get_encryption_manager() if encryption_enabled else None # Initialize backends self._memory_cache: LRUCache[CacheEntry] = LRUCache(max_memory_entries) self._file_cache_dir = self.settings.get_cache_path("secure_cache") self._file_cache_dir.mkdir(parents=True, exist_ok=True) # Cache metadata and statistics self._cache_stats = { "hits": 0, "misses": 0, "writes": 0, "deletes": 0, "cleanups": 0, "total_size_bytes": 0, } # Audit logging self._audit_logger = structlog.get_logger("cache_audit") self._audit_entries: List[CacheAuditEntry] = [] # Background cleanup task self._cleanup_task: Optional[asyncio.Task] = None self._start_background_cleanup() def _start_background_cleanup(self): """Start background cleanup task""" try: loop = asyncio.get_event_loop() if not loop.is_closed(): self._cleanup_task = loop.create_task(self._background_cleanup()) except RuntimeError: # No event loop running, cleanup will be manual logger.info("No event loop available, background cleanup disabled") async def _background_cleanup(self): """Background task for cache cleanup and maintenance""" while True: try: await asyncio.sleep(3600) # Run every hour await self.cleanup_expired() await self._rotate_audit_logs() except asyncio.CancelledError: break except Exception as e: logger.error("Background cleanup failed", error=str(e)) def _calculate_entry_size(self, value: Any) -> int: """Calculate size of cache entry in bytes""" try: if isinstance(value, (str, bytes)): return len(value.encode() if isinstance(value, str) else value) else: return len(json.dumps(value, ensure_ascii=False).encode()) except Exception: return 0 def _generate_cache_key_hash(self, key: str) -> str: """Generate hash of cache key for file storage""" return hashlib.sha256(key.encode()).hexdigest() def _calculate_checksum(self, data: Any) -> str: """Calculate checksum for data integrity""" try: if isinstance(data, bytes): content = data elif isinstance(data, str): content = data.encode() else: content = json.dumps(data, ensure_ascii=False, sort_keys=True).encode() return hashlib.sha256(content).hexdigest()[:16] except Exception: return "invalid" def _get_file_path(self, key: str) -> Path: """Get file path for cache key""" key_hash = self._generate_cache_key_hash(key) return self._file_cache_dir / f"{key_hash}.cache" async def _audit_log( self, operation: CacheAccessType, cache_key: str, success: bool, user_context: Optional[str] = None, ip_address: Optional[str] = None, error_message: Optional[str] = None, data_size: Optional[int] = None, privacy_level: Optional[str] = None ): """Log cache operation for audit trail""" if not self.settings.audit.audit_enabled: return # Anonymize IP address if required if ip_address and self.settings.privacy.anonymize_logs: ip_parts = ip_address.split('.') if len(ip_parts) == 4: ip_address = f"{ip_parts[0]}.{ip_parts[1]}.xxx.xxx" audit_entry = CacheAuditEntry( timestamp=datetime.now(timezone.utc), operation=operation, cache_key=cache_key[:50] + "..." if len(cache_key) > 50 else cache_key, # Truncate long keys user_context=user_context, ip_address=ip_address, success=success, error_message=error_message, data_size=data_size, privacy_level=privacy_level ) self._audit_entries.append(audit_entry) # Log to structured logger self._audit_logger.info( "Cache operation", operation=operation.value, cache_key=audit_entry.cache_key, success=success, data_size=data_size, privacy_level=privacy_level ) # Rotate audit logs if they get too large if len(self._audit_entries) > 10000: await self._rotate_audit_logs() async def _rotate_audit_logs(self): """Rotate and save audit logs""" if not self._audit_entries: return try: # Save current audit entries to file audit_file = self.settings.get_log_path(f"cache_audit_{int(time.time())}.json") audit_data = [asdict(entry) for entry in self._audit_entries] if self.encryption_enabled and self.encryption_manager: # Encrypt audit logs success = self.encryption_manager.secure_storage.encrypt_file(audit_file, audit_data) if not success: logger.error("Failed to encrypt audit logs") else: # Save unencrypted with open(audit_file, 'w') as f: json.dump(audit_data, f, indent=2, default=str) # Clear in-memory audit entries self._audit_entries.clear() logger.info(f"Audit logs rotated", file=str(audit_file), entries=len(audit_data)) except Exception as e: logger.error("Failed to rotate audit logs", error=str(e)) async def get( self, key: str, user_context: Optional[str] = None, ip_address: Optional[str] = None ) -> Optional[Any]: """Get value from cache""" start_time = time.time() try: # Try memory cache first for hybrid backend if self.backend in [CacheBackend.MEMORY, CacheBackend.HYBRID]: entry = self._memory_cache.get(key) if entry: # Check expiration if entry.expires_at and entry.expires_at < datetime.now(timezone.utc): self._memory_cache.remove(key) await self._audit_log(CacheAccessType.READ, key, False, user_context, ip_address, "Expired entry") self._cache_stats["misses"] += 1 return None # Update access metadata entry.access_count += 1 entry.last_accessed = datetime.now(timezone.utc) self._cache_stats["hits"] += 1 await self._audit_log(CacheAccessType.READ, key, True, user_context, ip_address, data_size=entry.size_bytes, privacy_level=entry.privacy_level) return entry.value # Try file cache for file or hybrid backend if self.backend in [CacheBackend.FILE, CacheBackend.HYBRID]: file_path = self._get_file_path(key) if file_path.exists(): try: if self.encryption_enabled and self.encryption_manager: # Decrypt file entry_data = self.encryption_manager.secure_storage.decrypt_file(file_path, return_dict=True) else: # Load unencrypted with open(file_path, 'r') as f: entry_data = json.load(f) if not entry_data: raise ValueError("Failed to load cache entry") # Reconstruct cache entry entry = CacheEntry( key=entry_data["key"], value=entry_data["value"], created_at=datetime.fromisoformat(entry_data["created_at"]), expires_at=datetime.fromisoformat(entry_data["expires_at"]) if entry_data["expires_at"] else None, access_count=entry_data["access_count"], last_accessed=datetime.fromisoformat(entry_data["last_accessed"]), size_bytes=entry_data["size_bytes"], tags=entry_data["tags"], privacy_level=entry_data["privacy_level"], checksum=entry_data["checksum"] ) # Verify checksum calculated_checksum = self._calculate_checksum(entry.value) if calculated_checksum != entry.checksum: logger.warning(f"Cache entry checksum mismatch", key=key) file_path.unlink() # Remove corrupted entry await self._audit_log(CacheAccessType.READ, key, False, user_context, ip_address, "Checksum mismatch") self._cache_stats["misses"] += 1 return None # Check expiration if entry.expires_at and entry.expires_at < datetime.now(timezone.utc): file_path.unlink() # Remove expired entry await self._audit_log(CacheAccessType.READ, key, False, user_context, ip_address, "Expired entry") self._cache_stats["misses"] += 1 return None # Update access metadata and save back entry.access_count += 1 entry.last_accessed = datetime.now(timezone.utc) # Also add to memory cache if using hybrid backend if self.backend == CacheBackend.HYBRID: self._memory_cache.put(key, entry) # Update file await self._save_file_entry(key, entry) self._cache_stats["hits"] += 1 await self._audit_log(CacheAccessType.READ, key, True, user_context, ip_address, data_size=entry.size_bytes, privacy_level=entry.privacy_level) return entry.value except Exception as e: logger.error(f"Failed to load cache entry from file", key=key, error=str(e)) # Remove corrupted file if file_path.exists(): file_path.unlink() # Cache miss self._cache_stats["misses"] += 1 await self._audit_log(CacheAccessType.READ, key, False, user_context, ip_address, "Cache miss") return None except Exception as e: logger.error(f"Cache get operation failed", key=key, error=str(e)) await self._audit_log(CacheAccessType.READ, key, False, user_context, ip_address, str(e)) self._cache_stats["misses"] += 1 return None async def _save_file_entry(self, key: str, entry: CacheEntry): """Save cache entry to file""" file_path = self._get_file_path(key) entry_data = { "key": entry.key, "value": entry.value, "created_at": entry.created_at.isoformat(), "expires_at": entry.expires_at.isoformat() if entry.expires_at else None, "access_count": entry.access_count, "last_accessed": entry.last_accessed.isoformat(), "size_bytes": entry.size_bytes, "tags": entry.tags, "privacy_level": entry.privacy_level, "checksum": entry.checksum } if self.encryption_enabled and self.encryption_manager: # Encrypt and save success = self.encryption_manager.secure_storage.encrypt_file(file_path, entry_data) if not success: raise ValueError("Failed to encrypt cache entry") else: # Save unencrypted with open(file_path, 'w') as f: json.dump(entry_data, f, indent=2, default=str) async def put( self, key: str, value: Any, ttl_seconds: Optional[int] = None, tags: Optional[List[str]] = None, privacy_level: str = "standard", user_context: Optional[str] = None, ip_address: Optional[str] = None ) -> bool: """Put value in cache""" try: # Calculate expiration time expires_at = None if ttl_seconds: expires_at = datetime.now(timezone.utc) + timedelta(seconds=ttl_seconds) elif self.settings.privacy.cache_retention_days > 0: expires_at = datetime.now(timezone.utc) + timedelta(days=self.settings.privacy.cache_retention_days) # Calculate entry size and checksum size_bytes = self._calculate_entry_size(value) checksum = self._calculate_checksum(value) # Create cache entry entry = CacheEntry( key=key, value=value, created_at=datetime.now(timezone.utc), expires_at=expires_at, access_count=0, last_accessed=datetime.now(timezone.utc), size_bytes=size_bytes, tags=tags or [], privacy_level=privacy_level, checksum=checksum ) # Store in memory cache if self.backend in [CacheBackend.MEMORY, CacheBackend.HYBRID]: self._memory_cache.put(key, entry) # Store in file cache if self.backend in [CacheBackend.FILE, CacheBackend.HYBRID]: await self._save_file_entry(key, entry) # Update statistics self._cache_stats["writes"] += 1 self._cache_stats["total_size_bytes"] += size_bytes await self._audit_log(CacheAccessType.WRITE, key, True, user_context, ip_address, data_size=size_bytes, privacy_level=privacy_level) logger.debug(f"Cache entry stored", key=key, size=size_bytes, expires_at=expires_at) return True except Exception as e: logger.error(f"Cache put operation failed", key=key, error=str(e)) await self._audit_log(CacheAccessType.WRITE, key, False, user_context, ip_address, str(e)) return False async def delete( self, key: str, user_context: Optional[str] = None, ip_address: Optional[str] = None ) -> bool: """Delete value from cache""" try: deleted = False # Remove from memory cache if self.backend in [CacheBackend.MEMORY, CacheBackend.HYBRID]: if self._memory_cache.remove(key): deleted = True # Remove from file cache if self.backend in [CacheBackend.FILE, CacheBackend.HYBRID]: file_path = self._get_file_path(key) if file_path.exists(): if self.encryption_enabled and self.encryption_manager: # Secure delete self.encryption_manager.secure_delete_file(file_path) else: file_path.unlink() deleted = True if deleted: self._cache_stats["deletes"] += 1 await self._audit_log(CacheAccessType.DELETE, key, True, user_context, ip_address) else: await self._audit_log(CacheAccessType.DELETE, key, False, user_context, ip_address, "Key not found") return deleted except Exception as e: logger.error(f"Cache delete operation failed", key=key, error=str(e)) await self._audit_log(CacheAccessType.DELETE, key, False, user_context, ip_address, str(e)) return False async def invalidate_by_tags( self, tags: List[str], user_context: Optional[str] = None, ip_address: Optional[str] = None ) -> int: """Invalidate cache entries by tags""" invalidated_count = 0 try: # Get all keys to check keys_to_check = [] # Check memory cache if self.backend in [CacheBackend.MEMORY, CacheBackend.HYBRID]: keys_to_check.extend(self._memory_cache.keys()) # Check file cache if self.backend in [CacheBackend.FILE, CacheBackend.HYBRID]: for file_path in self._file_cache_dir.glob("*.cache"): key_hash = file_path.stem # We can't easily reverse the hash, so we'll need to load each file # This is not efficient for large caches - consider indexing by tags keys_to_check.append(key_hash) # Check each entry for matching tags for key in keys_to_check: try: # Get entry to check tags entry = None if self.backend in [CacheBackend.MEMORY, CacheBackend.HYBRID]: entry = self._memory_cache.get(key) if not entry and self.backend in [CacheBackend.FILE, CacheBackend.HYBRID]: # Try to load from file file_path = self._get_file_path(key) if len(key) > 50 else self._file_cache_dir / f"{key}.cache" if file_path.exists(): try: if self.encryption_enabled and self.encryption_manager: entry_data = self.encryption_manager.secure_storage.decrypt_file(file_path, return_dict=True) else: with open(file_path, 'r') as f: entry_data = json.load(f) if entry_data: entry = CacheEntry( key=entry_data["key"], value=entry_data["value"], created_at=datetime.fromisoformat(entry_data["created_at"]), expires_at=datetime.fromisoformat(entry_data["expires_at"]) if entry_data["expires_at"] else None, access_count=entry_data["access_count"], last_accessed=datetime.fromisoformat(entry_data["last_accessed"]), size_bytes=entry_data["size_bytes"], tags=entry_data["tags"], privacy_level=entry_data["privacy_level"], checksum=entry_data["checksum"] ) except Exception: continue # Check if entry has any of the specified tags if entry and any(tag in entry.tags for tag in tags): await self.delete(entry.key, user_context, ip_address) invalidated_count += 1 except Exception as e: logger.error(f"Error checking cache entry for tag invalidation", key=key, error=str(e)) continue await self._audit_log(CacheAccessType.INVALIDATE, f"tags:{','.join(tags)}", True, user_context, ip_address, data_size=invalidated_count) logger.info(f"Invalidated cache entries by tags", tags=tags, count=invalidated_count) return invalidated_count except Exception as e: logger.error(f"Cache invalidation by tags failed", tags=tags, error=str(e)) await self._audit_log(CacheAccessType.INVALIDATE, f"tags:{','.join(tags)}", False, user_context, ip_address, error_message=str(e)) return 0 async def cleanup_expired( self, user_context: Optional[str] = None ) -> int: """Clean up expired cache entries""" cleaned_count = 0 current_time = datetime.now(timezone.utc) try: # Clean memory cache if self.backend in [CacheBackend.MEMORY, CacheBackend.HYBRID]: keys_to_remove = [] for key in self._memory_cache.keys(): entry = self._memory_cache.get(key) if entry and entry.expires_at and entry.expires_at < current_time: keys_to_remove.append(key) for key in keys_to_remove: self._memory_cache.remove(key) cleaned_count += 1 # Clean file cache if self.backend in [CacheBackend.FILE, CacheBackend.HYBRID]: for file_path in self._file_cache_dir.glob("*.cache"): try: if self.encryption_enabled and self.encryption_manager: entry_data = self.encryption_manager.secure_storage.decrypt_file(file_path, return_dict=True) else: with open(file_path, 'r') as f: entry_data = json.load(f) if entry_data and entry_data.get("expires_at"): expires_at = datetime.fromisoformat(entry_data["expires_at"]) if expires_at < current_time: if self.encryption_enabled and self.encryption_manager: self.encryption_manager.secure_delete_file(file_path) else: file_path.unlink() cleaned_count += 1 except Exception as e: logger.error(f"Error cleaning expired cache file", file=str(file_path), error=str(e)) # Remove corrupted files try: file_path.unlink() cleaned_count += 1 except Exception: pass self._cache_stats["cleanups"] += 1 await self._audit_log(CacheAccessType.CLEANUP, "expired_entries", True, user_context, data_size=cleaned_count) if cleaned_count > 0: logger.info(f"Cleaned up expired cache entries", count=cleaned_count) return cleaned_count except Exception as e: logger.error(f"Cache cleanup failed", error=str(e)) await self._audit_log(CacheAccessType.CLEANUP, "expired_entries", False, user_context, error_message=str(e)) return 0 async def clear_all( self, user_context: Optional[str] = None, ip_address: Optional[str] = None ) -> bool: """Clear all cache entries""" try: # Clear memory cache if self.backend in [CacheBackend.MEMORY, CacheBackend.HYBRID]: self._memory_cache.clear() # Clear file cache if self.backend in [CacheBackend.FILE, CacheBackend.HYBRID]: for file_path in self._file_cache_dir.glob("*.cache"): if self.encryption_enabled and self.encryption_manager: self.encryption_manager.secure_delete_file(file_path) else: file_path.unlink() # Reset statistics self._cache_stats = { "hits": 0, "misses": 0, "writes": 0, "deletes": 0, "cleanups": 0, "total_size_bytes": 0, } await self._audit_log(CacheAccessType.DELETE, "all_entries", True, user_context, ip_address) logger.info("All cache entries cleared") return True except Exception as e: logger.error(f"Cache clear all failed", error=str(e)) await self._audit_log(CacheAccessType.DELETE, "all_entries", False, user_context, ip_address, str(e)) return False def get_stats(self) -> Dict[str, Any]: """Get cache statistics""" memory_size = self._memory_cache.size() if self.backend in [CacheBackend.MEMORY, CacheBackend.HYBRID] else 0 file_count = 0 if self.backend in [CacheBackend.FILE, CacheBackend.HYBRID]: file_count = len(list(self._file_cache_dir.glob("*.cache"))) total_entries = memory_size + (file_count if self.backend == CacheBackend.FILE else 0) hit_rate = 0 if self._cache_stats["hits"] + self._cache_stats["misses"] > 0: hit_rate = self._cache_stats["hits"] / (self._cache_stats["hits"] + self._cache_stats["misses"]) return { "backend": self.backend.value, "total_entries": total_entries, "memory_entries": memory_size, "file_entries": file_count, "hit_rate": hit_rate, "encryption_enabled": self.encryption_enabled, **self._cache_stats } async def get_health_status(self) -> Dict[str, Any]: """Get cache health status""" stats = self.get_stats() # Check for issues issues = [] if stats["hit_rate"] < 0.5 and stats["hits"] + stats["misses"] > 100: issues.append("Low cache hit rate") if self.backend in [CacheBackend.FILE, CacheBackend.HYBRID]: if not self._file_cache_dir.exists(): issues.append("Cache directory not accessible") return { "status": "healthy" if not issues else "warning", "issues": issues, "stats": stats, "cache_directory": str(self._file_cache_dir) if self.backend in [CacheBackend.FILE, CacheBackend.HYBRID] else None } async def shutdown(self): """Shutdown cache and cleanup resources""" try: # Cancel background cleanup task if self._cleanup_task and not self._cleanup_task.done(): self._cleanup_task.cancel() try: await self._cleanup_task except asyncio.CancelledError: pass # Save audit logs await self._rotate_audit_logs() logger.info("Cache shutdown completed") except Exception as e: logger.error(f"Cache shutdown failed", error=str(e)) # Global cache instance secure_cache: Optional[SecureCache] = None def get_secure_cache() -> SecureCache: """Get secure cache singleton""" global secure_cache if secure_cache is None: settings = get_settings() secure_cache = SecureCache( backend=CacheBackend.HYBRID, max_memory_entries=1000, max_file_size_mb=settings.database.max_cache_size_mb, encryption_enabled=settings.database.encrypt_database ) return secure_cache

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vitalune/Nexus-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server