Skip to main content
Glama

Smart Code Search MCP Server

cache_manager.py15.2 kB
""" Cache Manager for Sub-Agent Results Enables 10-100x performance improvement through intelligent caching """ import hashlib import json import time from typing import Any, Dict, Optional, Callable, List, Tuple from datetime import datetime, timedelta from pathlib import Path import sqlite3 from threading import Lock from .db_wrapper import ThreadSafeDB class CacheStats: """Track cache performance statistics""" def __init__(self): self.hits = 0 self.misses = 0 self.total_saved_ms = 0 self.cache_size_bytes = 0 self.evictions = 0 @property def hit_rate(self) -> float: total = self.hits + self.misses return self.hits / total if total > 0 else 0.0 @property def avg_saved_ms(self) -> float: return self.total_saved_ms / self.hits if self.hits > 0 else 0.0 def to_dict(self) -> Dict: return { 'hits': self.hits, 'misses': self.misses, 'hit_rate': f"{self.hit_rate:.1%}", 'total_saved_ms': self.total_saved_ms, 'avg_saved_ms': self.avg_saved_ms, 'cache_size_bytes': self.cache_size_bytes, 'evictions': self.evictions } class CacheManager: """ Intelligent caching system for agent execution results Provides multi-level caching with TTL, warming, and invalidation """ def __init__(self, db: ThreadSafeDB, ttl_seconds: int = 3600, max_size_mb: int = 100, warm_on_startup: bool = True): """ Initialize cache manager Args: db: Thread-safe database connection ttl_seconds: Time-to-live for cache entries max_size_mb: Maximum cache size in megabytes warm_on_startup: Whether to preload common patterns """ self.db = db self.ttl = timedelta(seconds=ttl_seconds) self.max_size_bytes = max_size_mb * 1024 * 1024 self.stats = CacheStats() self._memory_cache = {} # In-memory L1 cache self._cache_lock = Lock() # Initialize cache tables self.init_cache_tables() # Warm cache if requested if warm_on_startup: self.warm_cache() def init_cache_tables(self): """Ensure cache tables exist""" # Tables created by migration script # Verify they exist with self.db.get_connection() as conn: cursor = conn.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name='agent_execution_cache' """) if not cursor.fetchone(): # Create if not exists (for testing) conn.execute(""" CREATE TABLE IF NOT EXISTS agent_execution_cache ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent_name TEXT NOT NULL, input_hash TEXT NOT NULL, output TEXT NOT NULL, execution_ms INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, expires_at TIMESTAMP, hit_count INTEGER DEFAULT 0, UNIQUE(agent_name, input_hash) ) """) conn.commit() def warm_cache(self): """Preload cache with common operations""" print("Warming cache with common patterns...") # Common queries to preload common_patterns = [ ("find_functions", {"pattern": "*.py", "limit": 100}), ("extract_imports", {"pattern": "*.py"}), ("find_todos", {"pattern": "*"}), ("check_complexity", {"pattern": "*.py", "threshold": 10}), ("find_duplicates", {"threshold": 0.85}), ("unused_imports", {"pattern": "*.py"}), ] warmed = 0 for agent_name, inputs in common_patterns: # Check if already cached input_hash = self._hash_inputs(inputs) if not self._get_cached(agent_name, input_hash): # Would compute here if we had the actual agent # For now, just mark as pattern to warm warmed += 1 print(f"Cache warming complete: {warmed} patterns marked") def get_or_compute(self, agent_name: str, inputs: Dict, compute_fn: Callable, force_refresh: bool = False) -> Any: """ Get cached result or compute and cache Args: agent_name: Name of the agent inputs: Input parameters compute_fn: Function to compute result if not cached force_refresh: Force recomputation even if cached Returns: Cached or computed result """ input_hash = self._hash_inputs(inputs) # Check L1 memory cache first (fastest) memory_key = f"{agent_name}:{input_hash}" if not force_refresh and memory_key in self._memory_cache: self.stats.hits += 1 return self._memory_cache[memory_key]['result'] # Check L2 database cache if not force_refresh: cached = self._get_cached(agent_name, input_hash) if cached and not self._is_expired(cached): self._increment_hit_count(agent_name, input_hash) self.stats.hits += 1 self.stats.total_saved_ms += cached.get('execution_ms', 0) result = json.loads(cached['output']) # Promote to L1 cache with self._cache_lock: self._memory_cache[memory_key] = { 'result': result, 'timestamp': datetime.now() } return result # Cache miss - compute result self.stats.misses += 1 start_time = time.time() try: result = compute_fn(inputs) except Exception as e: # Don't cache errors raise e execution_ms = int((time.time() - start_time) * 1000) # Store in both L1 and L2 cache self._store_cache(agent_name, input_hash, result, execution_ms) with self._cache_lock: self._memory_cache[memory_key] = { 'result': result, 'timestamp': datetime.now() } # Check cache size and evict if necessary self._check_cache_size() return result def invalidate(self, agent_name: Optional[str] = None, pattern: Optional[str] = None): """ Invalidate cache entries Args: agent_name: Invalidate all entries for this agent pattern: Invalidate entries matching pattern """ with self._cache_lock: if agent_name: # Clear L1 cache for agent keys_to_remove = [k for k in self._memory_cache if k.startswith(f"{agent_name}:")] for key in keys_to_remove: del self._memory_cache[key] # Clear L2 cache for agent with self.db.get_connection() as conn: conn.execute( "DELETE FROM agent_execution_cache WHERE agent_name = ?", (agent_name,) ) conn.commit() else: # Clear all caches self._memory_cache.clear() with self.db.get_connection() as conn: conn.execute("DELETE FROM agent_execution_cache") conn.commit() def get_stats(self) -> Dict: """Get cache statistics""" stats = self.stats.to_dict() # Add database stats with self.db.get_connection() as conn: cursor = conn.execute(""" SELECT COUNT(*), SUM(LENGTH(output)), AVG(hit_count) FROM agent_execution_cache """) count, total_size, avg_hits = cursor.fetchone() stats['db_entries'] = count or 0 stats['db_size_bytes'] = total_size or 0 stats['avg_hits_per_entry'] = avg_hits or 0 stats['memory_entries'] = len(self._memory_cache) return stats def _hash_inputs(self, inputs: Dict) -> str: """Create deterministic hash of inputs""" # Sort keys for consistency normalized = json.dumps(inputs, sort_keys=True, default=str) return hashlib.sha256(normalized.encode()).hexdigest()[:16] def _get_cached(self, agent_name: str, input_hash: str) -> Optional[Dict]: """Get cached entry from database""" with self.db.get_connection() as conn: cursor = conn.execute(""" SELECT output, execution_ms, created_at, expires_at, hit_count FROM agent_execution_cache WHERE agent_name = ? AND input_hash = ? """, (agent_name, input_hash)) row = cursor.fetchone() if row: return { 'output': row[0], 'execution_ms': row[1], 'created_at': row[2], 'expires_at': row[3], 'hit_count': row[4] } return None def _is_expired(self, cached: Dict) -> bool: """Check if cache entry is expired""" if not cached.get('expires_at'): # No expiration set expires_at = datetime.fromisoformat(cached['created_at']) + self.ttl else: expires_at = datetime.fromisoformat(cached['expires_at']) return datetime.now() > expires_at def _increment_hit_count(self, agent_name: str, input_hash: str): """Increment hit count for cache entry""" with self.db.get_connection() as conn: conn.execute(""" UPDATE agent_execution_cache SET hit_count = hit_count + 1 WHERE agent_name = ? AND input_hash = ? """, (agent_name, input_hash)) conn.commit() def _store_cache(self, agent_name: str, input_hash: str, result: Any, execution_ms: int): """Store result in cache""" output_json = json.dumps(result, default=str) expires_at = datetime.now() + self.ttl with self.db.get_connection() as conn: conn.execute(""" INSERT OR REPLACE INTO agent_execution_cache (agent_name, input_hash, output, execution_ms, created_at, expires_at, hit_count) VALUES (?, ?, ?, ?, ?, ?, 0) """, (agent_name, input_hash, output_json, execution_ms, datetime.now().isoformat(), expires_at.isoformat())) conn.commit() self.stats.cache_size_bytes += len(output_json) def _check_cache_size(self): """Check cache size and evict if necessary""" with self.db.get_connection() as conn: # Get current cache size cursor = conn.execute(""" SELECT SUM(LENGTH(output)) FROM agent_execution_cache """) current_size = cursor.fetchone()[0] or 0 if current_size > self.max_size_bytes: # Evict least recently used entries evict_size = current_size - (self.max_size_bytes * 0.8) # Free 20% conn.execute(""" DELETE FROM agent_execution_cache WHERE id IN ( SELECT id FROM agent_execution_cache ORDER BY hit_count ASC, created_at ASC LIMIT ( SELECT COUNT(*) FROM agent_execution_cache WHERE LENGTH(output) < ? ) ) """, (evict_size,)) deleted = conn.total_changes conn.commit() self.stats.evictions += deleted # Clear L1 cache too with self._cache_lock: if len(self._memory_cache) > 100: # Arbitrary limit # Keep most recent 50 items = sorted(self._memory_cache.items(), key=lambda x: x[1]['timestamp'], reverse=True) self._memory_cache = dict(items[:50]) def preload_for_file(self, file_path: str): """ Preload cache for a specific file Useful when opening a file in editor """ common_agents = [ "import_analyzer", "complexity_analyzer", "function_extractor", "class_analyzer" ] for agent in common_agents: inputs = {"file_path": file_path} input_hash = self._hash_inputs(inputs) # Check if already cached if not self._get_cached(agent, input_hash): # Mark for warming (would compute in real implementation) pass def export_cache(self, output_path: Path) -> int: """Export cache to file for backup/sharing""" cache_data = [] with self.db.get_connection() as conn: cursor = conn.execute(""" SELECT agent_name, input_hash, output, execution_ms, hit_count FROM agent_execution_cache WHERE hit_count > 1 ORDER BY hit_count DESC """) for row in cursor: cache_data.append({ 'agent': row[0], 'hash': row[1], 'output': row[2], 'ms': row[3], 'hits': row[4] }) with open(output_path, 'w') as f: json.dump(cache_data, f, indent=2) return len(cache_data) def import_cache(self, input_path: Path) -> int: """Import cache from file""" with open(input_path, 'r') as f: cache_data = json.load(f) imported = 0 with self.db.get_connection() as conn: for entry in cache_data: conn.execute(""" INSERT OR IGNORE INTO agent_execution_cache (agent_name, input_hash, output, execution_ms, hit_count) VALUES (?, ?, ?, ?, ?) """, (entry['agent'], entry['hash'], entry['output'], entry['ms'], entry['hits'])) imported += 1 conn.commit() return imported

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/stevenjjobson/scs-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server