Skip to main content
Glama

NetBox Read/Write MCP Server

cache.py15.2 kB
#!/usr/bin/env python3 """ PHASE 2 OPTIMIZATION: Intelligent Caching Layer Context-aware caching with relationship understanding and TTL-based strategies for optimal NetBox MCP query performance. """ import logging import hashlib import json from typing import Dict, Any, Optional, List, Tuple from cachetools import TTLCache import threading import time logger = logging.getLogger(__name__) class SmartCache: """ Context-aware caching with relationship understanding. Performance Strategy: - Static data (manufacturers, device types, sites): Long TTL (30min-1hr) - Dynamic data (devices, search results): Short TTL (5min) - Failed queries: Medium TTL (10min) to avoid repeated failures - Relationship extraction: Pre-populate related caches from query results """ def __init__(self): # Static data caches (long TTL) - rarely change self.manufacturers_cache = TTLCache(maxsize=100, ttl=3600) # 1 hour self.device_types_cache = TTLCache(maxsize=500, ttl=1800) # 30 min self.device_roles_cache = TTLCache(maxsize=100, ttl=3600) # 1 hour - PHASE 2.3: Role caching self.sites_cache = TTLCache(maxsize=100, ttl=1800) # 30 min self.racks_cache = TTLCache(maxsize=200, ttl=1800) # 30 min # Dynamic data caches (short TTL) - frequently updated self.devices_cache = TTLCache(maxsize=1000, ttl=300) # 5 min self.device_search_cache = TTLCache(maxsize=200, ttl=300) # 5 min self.device_list_cache = TTLCache(maxsize=50, ttl=300) # 5 min # Failed query cache (avoid repeated failures) self.failed_filters_cache = TTLCache(maxsize=100, ttl=600) # 10 min # Performance tracking self.cache_stats = { "hits": 0, "misses": 0, "failed_queries_avoided": 0, "relationship_extractions": 0 } # Thread safety self._lock = threading.RLock() def _generate_cache_key(self, operation: str, **kwargs) -> str: """ Generate deterministic cache key from operation and parameters. Args: operation: The type of operation (e.g., "list_devices", "get_device") **kwargs: Query parameters Returns: Deterministic cache key string """ # Sort kwargs for deterministic key generation sorted_kwargs = dict(sorted(kwargs.items())) key_data = {"op": operation, "params": sorted_kwargs} key_json = json.dumps(key_data, sort_keys=True) return hashlib.md5(key_json.encode()).hexdigest() def get_cached_result(self, operation: str, **kwargs) -> Optional[Tuple[Any, str]]: """ Retrieve cached result for the given operation and parameters. Args: operation: The type of operation **kwargs: Query parameters Returns: Tuple of (cached_result, cache_type) or None if not cached """ cache_key = self._generate_cache_key(operation, **kwargs) with self._lock: # Check if this is a known failed query if cache_key in self.failed_filters_cache: self.cache_stats["failed_queries_avoided"] += 1 logger.debug(f"Avoiding known failed query: {operation} with {kwargs}") return None, "failed_query_avoided" # Try different cache types based on operation cache_mapping = { "list_devices": self.device_list_cache, "search_devices": self.device_search_cache, "get_device": self.devices_cache, "list_manufacturers": self.manufacturers_cache, "list_device_types": self.device_types_cache, "list_device_roles": self.device_roles_cache, # PHASE 2.3: Role caching "list_sites": self.sites_cache, "list_racks": self.racks_cache } cache = cache_mapping.get(operation) if cache and cache_key in cache: self.cache_stats["hits"] += 1 logger.debug(f"Cache HIT for {operation}: {cache_key}") return cache[cache_key], "cache_hit" self.cache_stats["misses"] += 1 logger.debug(f"Cache MISS for {operation}: {cache_key}") return None, "cache_miss" def cache_result(self, operation: str, result: Any, **kwargs) -> None: """ Cache the result of an operation and extract relationship data. Args: operation: The type of operation result: The result to cache **kwargs: Query parameters used for the operation """ cache_key = self._generate_cache_key(operation, **kwargs) with self._lock: # Determine which cache to use cache_mapping = { "list_devices": self.device_list_cache, "search_devices": self.device_search_cache, "get_device": self.devices_cache, "list_manufacturers": self.manufacturers_cache, "list_device_types": self.device_types_cache, "list_device_roles": self.device_roles_cache, # PHASE 2.3: Role caching "list_sites": self.sites_cache, "list_racks": self.racks_cache } cache = cache_mapping.get(operation) if cache: cache[cache_key] = result logger.debug(f"Cached result for {operation}: {cache_key}") # Extract relationship data for pre-population if operation in ["list_devices", "search_devices", "get_device"]: self._extract_and_cache_relationships(result) def cache_failed_query(self, operation: str, error: str, **kwargs) -> None: """ Cache information about a failed query to avoid repeating it. Args: operation: The type of operation that failed error: Error description **kwargs: Query parameters that caused the failure """ cache_key = self._generate_cache_key(operation, **kwargs) with self._lock: self.failed_filters_cache[cache_key] = { "error": error, "timestamp": time.time(), "operation": operation, "params": kwargs } logger.debug(f"Cached failed query: {operation} - {error}") def _extract_and_cache_relationships(self, result: Any) -> None: """ Extract relationship data from query results and pre-populate caches. This reduces future API calls by caching related objects found in responses. Args: result: Query result containing device data """ try: devices = [] # Handle different result formats if isinstance(result, dict): if "devices" in result: devices = result["devices"] elif isinstance(result.get("data"), list): devices = result["data"] elif isinstance(result, list): devices = result # Extract and cache relationship data for device in devices: if not isinstance(device, dict): continue # Cache manufacturer data device_type = device.get("device_type") if isinstance(device_type, dict): manufacturer = device_type.get("manufacturer") if isinstance(manufacturer, dict) and "name" in manufacturer: mfg_key = self._generate_cache_key("get_manufacturer", name=manufacturer["name"]) self.manufacturers_cache[mfg_key] = manufacturer # Cache site data site = device.get("site") if isinstance(site, dict) and "name" in site: site_key = self._generate_cache_key("get_site", name=site["name"]) self.sites_cache[site_key] = site # Cache rack data rack = device.get("rack") if isinstance(rack, dict) and "name" in rack: rack_key = self._generate_cache_key("get_rack", name=rack["name"]) self.racks_cache[rack_key] = rack # Cache device type data if isinstance(device_type, dict) and "model" in device_type: dt_key = self._generate_cache_key("get_device_type", model=device_type["model"]) self.device_types_cache[dt_key] = device_type # PHASE 2.3: Cache device role data role = device.get("role") if isinstance(role, dict) and "slug" in role: role_key = self._generate_cache_key("get_device_role", slug=role["slug"]) self.device_roles_cache[role_key] = role if devices: self.cache_stats["relationship_extractions"] += 1 logger.debug(f"Extracted relationships from {len(devices)} devices") except Exception as e: logger.warning(f"Failed to extract relationships from result: {e}") def clear_cache(self, cache_type: Optional[str] = None) -> None: """ Clear specified cache or all caches. Args: cache_type: Specific cache to clear, or None to clear all """ with self._lock: if cache_type == "manufacturers": self.manufacturers_cache.clear() elif cache_type == "device_types": self.device_types_cache.clear() elif cache_type == "sites": self.sites_cache.clear() elif cache_type == "racks": self.racks_cache.clear() elif cache_type == "devices": self.devices_cache.clear() self.device_search_cache.clear() self.device_list_cache.clear() elif cache_type == "failed_queries": self.failed_filters_cache.clear() else: # Clear all caches self.manufacturers_cache.clear() self.device_types_cache.clear() self.sites_cache.clear() self.racks_cache.clear() self.devices_cache.clear() self.device_search_cache.clear() self.device_list_cache.clear() self.failed_filters_cache.clear() logger.info(f"Cleared cache: {cache_type or 'all'}") def get_cache_stats(self) -> Dict[str, Any]: """ Get current cache performance statistics. Returns: Dictionary containing cache performance metrics """ with self._lock: stats = self.cache_stats.copy() # Calculate hit rate total_requests = stats["hits"] + stats["misses"] hit_rate = (stats["hits"] / total_requests * 100) if total_requests > 0 else 0 # Add cache sizes stats.update({ "hit_rate_percentage": round(hit_rate, 2), "total_requests": total_requests, "cache_sizes": { "manufacturers": len(self.manufacturers_cache), "device_types": len(self.device_types_cache), "sites": len(self.sites_cache), "racks": len(self.racks_cache), "devices": len(self.devices_cache), "device_search": len(self.device_search_cache), "device_list": len(self.device_list_cache), "failed_queries": len(self.failed_filters_cache) } }) return stats def warm_cache(self, client, priority_operations: Optional[List[str]] = None) -> None: """ Pre-warm cache with commonly accessed data. Args: client: NetBox client instance priority_operations: List of operations to prioritize for warming """ try: default_operations = ["list_manufacturers", "list_sites", "list_device_types"] operations = priority_operations or default_operations logger.info(f"Warming cache with operations: {operations}") for operation in operations: try: if operation == "list_manufacturers" and not self.manufacturers_cache: manufacturers = list(client.dcim.manufacturers.all()) self.cache_result("list_manufacturers", manufacturers) elif operation == "list_sites" and not self.sites_cache: sites = list(client.dcim.sites.all()) self.cache_result("list_sites", sites) elif operation == "list_device_types" and not self.device_types_cache: device_types = list(client.dcim.device_types.all()) self.cache_result("list_device_types", device_types) except Exception as e: logger.warning(f"Failed to warm cache for {operation}: {e}") self.cache_failed_query(operation, str(e)) logger.info("Cache warming completed") except Exception as e: logger.error(f"Cache warming failed: {e}") # Global cache instance _global_cache: Optional[SmartCache] = None _cache_lock = threading.Lock() def get_cache() -> SmartCache: """ Get or create the global cache instance (singleton pattern). Returns: Global SmartCache instance """ global _global_cache if _global_cache is None: with _cache_lock: if _global_cache is None: _global_cache = SmartCache() logger.info("Initialized global SmartCache instance") return _global_cache def cache_enabled_operation(operation_name: str): """ Decorator to add caching to any operation. Usage: @cache_enabled_operation("list_devices") def my_device_function(client, **kwargs): return expensive_api_call(client, **kwargs) """ def decorator(func): def wrapper(*args, **kwargs): cache = get_cache() # Try to get cached result cached_result, cache_type = cache.get_cached_result(operation_name, **kwargs) if cached_result is not None: return cached_result try: # Execute the actual operation result = func(*args, **kwargs) # Cache the successful result cache.cache_result(operation_name, result, **kwargs) return result except Exception as e: # Cache the failed operation cache.cache_failed_query(operation_name, str(e), **kwargs) raise return wrapper return decorator

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Deployment-Team/netbox-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server