# cache_service.py
import time
from typing import Any, Dict, List, Optional, Tuple, Union
from secure_mcp_gateway.plugins.telemetry import get_telemetry_config_manager
# Get metrics from telemetry manager
telemetry_manager = get_telemetry_config_manager()
# Telemetry metrics will be obtained lazily when needed
from secure_mcp_gateway.utils import (
get_common_config,
logger,
mask_key,
)
# Get debug log level
common_config = get_common_config()
ENKRYPT_LOG_LEVEL = common_config.get("enkrypt_log_level", "INFO").lower()
IS_DEBUG_LOG_LEVEL = ENKRYPT_LOG_LEVEL == "debug"
class CacheService:
"""
Cache service for Enkrypt Secure MCP Gateway.
Handles all caching operations including tool caching, gateway config caching, and cache management.
"""
def __init__(self):
"""Initialize the cache service."""
# logger.info("Initializing Enkrypt Secure MCP Gateway Cache Service")
# Get configuration
self.common_config = get_common_config()
# Cache configuration
self.tool_cache_expiration = int(
self.common_config.get("enkrypt_tool_cache_expiration", 4)
)
self.gateway_cache_expiration = int(
self.common_config.get("enkrypt_gateway_cache_expiration", 24)
)
self.use_external_cache = self.common_config.get(
"enkrypt_mcp_use_external_cache", False
)
# Initialize cache client
self.cache_client = None
self._initialize_cache()
logger.info("Cache service initialized:")
logger.info(f" - Tool cache expiration: {self.tool_cache_expiration} hours")
logger.info(
f" - Gateway cache expiration: {self.gateway_cache_expiration} hours"
)
logger.info(f" - External cache enabled: {self.use_external_cache}")
def _initialize_cache(self):
"""Initialize the cache client."""
if self.use_external_cache:
logger.info("Initializing External Cache connection")
try:
from secure_mcp_gateway.client import initialize_cache
self.cache_client = initialize_cache()
logger.info("[external_cache] Successfully connected to External Cache")
except Exception as e:
# Use standardized error handling
from secure_mcp_gateway.error_handling import error_logger
from secure_mcp_gateway.exceptions import (
ErrorCode,
ErrorContext,
create_system_error,
)
context = ErrorContext(
operation="cache.external_connection",
additional_context={
"cache_host": self.cache_host,
"cache_port": self.cache_port,
},
)
error = create_system_error(
code=ErrorCode.CACHE_CONNECTION_FAILED,
message=f"Failed to connect to External Cache: {e}",
context=context,
cause=e,
)
error_logger.log_error(error)
logger.error(
f"[external_cache] Failed to connect to External Cache: {e}"
)
self.cache_client = None
else:
logger.info("External Cache is not enabled. Using local cache only.")
self.cache_client = None
def get_redis_ttl(self, key: str) -> tuple:
"""
Get the TTL (Time To Live) for a Redis key.
Args:
key (str): Redis key to check
Returns:
tuple: (ttl_seconds, expires_at_timestamp) or (None, None) if key doesn't exist or no external cache
"""
if not self.cache_client or not key:
return None, None
try:
ttl_seconds = self.cache_client.ttl(key)
if ttl_seconds == -1:
# Key exists but has no expiration
return None, None
elif ttl_seconds == -2:
# Key doesn't exist
return None, None
else:
# Key exists and has expiration
expires_at = time.time() + ttl_seconds
return ttl_seconds, expires_at
except Exception as e:
# Use standardized error handling
from secure_mcp_gateway.error_handling import error_logger
from secure_mcp_gateway.exceptions import (
ErrorCode,
ErrorContext,
create_system_error,
)
context = ErrorContext(
operation="cache.get_ttl",
additional_context={"cache_key": key},
)
error = create_system_error(
code=ErrorCode.CACHE_OPERATION_FAILED,
message=f"Error getting TTL for key {key}: {e}",
context=context,
cause=e,
)
error_logger.log_error(error)
logger.error(f"[get_redis_ttl] Error getting TTL for key {key}: {e}")
return None, None
def cache_tools(
self, server_id: str, server_name: str, tools: List[Dict[str, Any]]
) -> bool:
"""
Cache tools for a specific server.
Args:
server_id (str): Server ID
server_name (str): Server name
tools (List[Dict[str, Any]]): Tools to cache
Returns:
bool: True if caching was successful, False otherwise
"""
try:
from secure_mcp_gateway.client import cache_tools
cache_tools(self.cache_client, server_id, server_name, tools)
logger.info(f"[cache_tools] Successfully cached tools for {server_name}")
return True
except Exception as e:
logger.error(f"[cache_tools] Failed to cache tools for {server_name}: {e}")
return False
def get_cached_tools(
self, server_id: str, server_name: str
) -> Optional[Union[List[Dict[str, Any]], Tuple[List[Dict[str, Any]], str]]]:
"""
Get cached tools for a specific server.
Args:
server_id (str): Server ID
server_name (str): Server name
Returns:
Optional[Union[List[Dict[str, Any]], Tuple[List[Dict[str, Any]], str]]]:
Cached tools or tuple of (tools, expires_at), or None if not found
"""
try:
from secure_mcp_gateway.client import get_cached_tools
return get_cached_tools(self.cache_client, server_id, server_name)
except Exception as e:
logger.error(
f"[get_cached_tools] Failed to get cached tools for {server_name}: {e}"
)
return None
def cache_gateway_config(self, gateway_id: str, config: Dict[str, Any]) -> bool:
"""
Cache gateway configuration.
Args:
gateway_id (str): Gateway ID
config (Dict[str, Any]): Configuration to cache
Returns:
bool: True if caching was successful, False otherwise
"""
try:
from secure_mcp_gateway.client import cache_gateway_config
cache_gateway_config(self.cache_client, gateway_id, config)
logger.info(
f"[cache_gateway_config] Successfully cached gateway config for {gateway_id}"
)
return True
except Exception as e:
logger.error(
f"[cache_gateway_config] Failed to cache gateway config for {gateway_id}: {e}"
)
return False
def get_cached_gateway_config(
self, gateway_id: str
) -> Optional[Union[Dict[str, Any], Tuple[Dict[str, Any], str]]]:
"""
Get cached gateway configuration.
Args:
gateway_id (str): Gateway ID
Returns:
Optional[Union[Dict[str, Any], Tuple[Dict[str, Any], str]]]:
Cached config or tuple of (config, expires_at), or None if not found
"""
try:
from secure_mcp_gateway.client import get_cached_gateway_config
return get_cached_gateway_config(self.cache_client, gateway_id)
except Exception as e:
logger.error(
f"[get_cached_gateway_config] Failed to get cached gateway config for {gateway_id}: {e}"
)
return None
def clear_cache_for_servers(self, id: str, server_name: str = None) -> int:
"""
Clear cache for specific servers.
Args:
id (str): Gateway/User ID
server_name (str, optional): Name of the server to clear cache for. If None, clears all servers.
Returns:
int: Number of cache entries cleared
"""
try:
from secure_mcp_gateway.client import clear_cache_for_servers
count = clear_cache_for_servers(self.cache_client, id, server_name)
logger.info(
f"[clear_cache_for_servers] Successfully cleared {count} cache entries for id={id}, server_name={server_name}"
)
return count
except Exception as e:
logger.error(
f"[clear_cache_for_servers] Failed to clear cache for id={id}, server_name={server_name}: {e}"
)
return 0
def clear_gateway_config_cache(self, id: str, gateway_key: str) -> bool:
"""
Clear gateway configuration cache.
Args:
id (str): Gateway/User ID
gateway_key (str): Gateway key to clear cache for
Returns:
bool: True if clearing was successful, False otherwise
"""
try:
from secure_mcp_gateway.client import clear_gateway_config_cache
result = clear_gateway_config_cache(self.cache_client, id, gateway_key)
logger.info(
f"[clear_gateway_config_cache] Successfully cleared gateway config cache for id={id}, gateway_key={mask_key(gateway_key)}"
)
return result
except Exception as e:
logger.error(
f"[clear_gateway_config_cache] Failed to clear gateway config cache for id={id}, gateway_key={mask_key(gateway_key)}: {e}"
)
return False
def get_cache_statistics(self) -> Dict[str, Any]:
"""
Get cache statistics.
Returns:
Dict[str, Any]: Cache statistics including total caches, cache type, etc.
"""
try:
from secure_mcp_gateway.client import get_cache_statistics
return get_cache_statistics(self.cache_client)
except Exception as e:
logger.error(f"[get_cache_statistics] Failed to get cache statistics: {e}")
return {
"total_tool_caches": 0,
"total_config_caches": 0,
"cache_type": "error",
"error": str(e),
}
def get_server_hashed_key(self, id: str, server_name: str) -> str:
"""
Get hashed key for a server.
Args:
id (str): Gateway/User ID
server_name (str): Server name
Returns:
str: Hashed key for the server
"""
try:
from secure_mcp_gateway.client import get_server_hashed_key
return get_server_hashed_key(id, server_name)
except Exception as e:
logger.error(
f"[get_server_hashed_key] Failed to get hashed key for {server_name}: {e}"
)
return ""
def get_gateway_config_hashed_key(self, gateway_key: str) -> str:
"""
Get hashed key for gateway configuration.
Args:
gateway_key (str): Gateway key
Returns:
str: Hashed key for the gateway configuration
"""
try:
from secure_mcp_gateway.client import get_gateway_config_hashed_key
return get_gateway_config_hashed_key(gateway_key)
except Exception as e:
logger.error(
f"[get_gateway_config_hashed_key] Failed to get hashed key for gateway: {e}"
)
return ""
def get_id_from_key(self, key: str) -> str:
"""
Get ID from a cache key.
Args:
key (str): Cache key
Returns:
str: ID extracted from the key
"""
try:
from secure_mcp_gateway.client import get_id_from_key
return get_id_from_key(key)
except Exception as e:
logger.error(f"[get_id_from_key] Failed to get ID from key: {e}")
return ""
def cache_key_to_id(self, key: str) -> str:
"""
Convert cache key to ID.
Args:
key (str): Cache key
Returns:
str: ID from the cache key
"""
try:
from secure_mcp_gateway.client import cache_key_to_id
return cache_key_to_id(key)
except Exception as e:
logger.error(f"[cache_key_to_id] Failed to convert key to ID: {e}")
return ""
def is_cache_available(self) -> bool:
"""
Check if cache is available.
Returns:
bool: True if cache is available, False otherwise
"""
return self.cache_client is not None
def get_cache_type(self) -> str:
"""
Get the type of cache being used.
Returns:
str: Cache type ('external', 'local', or 'none')
"""
if self.use_external_cache and self.cache_client:
return "external"
elif not self.use_external_cache:
return "local"
else:
return "none"
def get_cache_config(self) -> Dict[str, Any]:
"""
Get cache configuration.
Returns:
Dict[str, Any]: Cache configuration
"""
return {
"tool_cache_expiration_hours": self.tool_cache_expiration,
"gateway_cache_expiration_hours": self.gateway_cache_expiration,
"use_external_cache": self.use_external_cache,
"cache_type": self.get_cache_type(),
"cache_available": self.is_cache_available(),
}
def get_latest_server_info(self, server_info, id, cache_client):
"""
Returns a fresh copy of server info with the latest tools.
Args:
server_info (dict): Original server configuration
id (str): ID of the Gateway or User
cache_client: Cache client instance
Returns:
dict: Updated server info with latest tools from config or cache
"""
if IS_DEBUG_LOG_LEVEL:
logger.debug(
f"[get_latest_server_info] Getting latest server info for {id}"
)
server_info_copy = server_info.copy()
config_tools = server_info_copy.get("tools", {})
server_name = server_info_copy.get("server_name")
logger.info(f"[get_latest_server_info] Server name: {server_name}")
# If tools is empty {}, then we discover them
if not config_tools:
logger.info(
f"[get_latest_server_info] No config tools found for {server_name}"
)
cached_tools = self.get_cached_tools(id, server_name)
if cached_tools:
# Update metrics lazily
if (
hasattr(telemetry_manager, "cache_hit_counter")
and telemetry_manager.cache_hit_counter
):
telemetry_manager.cache_hit_counter.add(1)
if IS_DEBUG_LOG_LEVEL:
logger.debug(
f"[get_latest_server_info] Found cached tools for {server_name}"
)
server_info_copy["tools"] = cached_tools
server_info_copy["has_cached_tools"] = True
server_info_copy["tools_source"] = "cache"
else:
# Update metrics lazily
if (
hasattr(telemetry_manager, "cache_miss_counter")
and telemetry_manager.cache_miss_counter
):
telemetry_manager.cache_miss_counter.add(1)
if IS_DEBUG_LOG_LEVEL:
logger.debug(
f"[get_latest_server_info] No cached tools found for {server_name}. Need to discover them"
)
server_info_copy["tools"] = {}
server_info_copy["has_cached_tools"] = False
server_info_copy["tools_source"] = "needs_discovery"
else:
if IS_DEBUG_LOG_LEVEL:
logger.debug(
f"[get_latest_server_info] Tools defined in config for {server_name}, checking cache first"
)
# Check if config tools are already cached for better performance
cached_tools = self.get_cached_tools(id, server_name)
if cached_tools:
# Use cached tools (faster than reading config)
if IS_DEBUG_LOG_LEVEL:
logger.debug(
f"[get_latest_server_info] Found cached tools for {server_name}, using cache"
)
server_info_copy["tools"] = cached_tools
server_info_copy["has_cached_tools"] = True
server_info_copy["tools_source"] = "cache"
else:
# Cache the config tools for future use
if IS_DEBUG_LOG_LEVEL:
logger.debug(
f"[get_latest_server_info] Caching config tools for {server_name}"
)
self.cache_tools(id, server_name, config_tools)
server_info_copy["tools"] = config_tools
server_info_copy["has_cached_tools"] = True
server_info_copy["tools_source"] = "config"
return server_info_copy
# Global cache service instance
cache_service = CacheService()
# Export the cache client for backward compatibility
cache_client = cache_service.cache_client
# Export configuration constants for backward compatibility
ENKRYPT_TOOL_CACHE_EXPIRATION = cache_service.tool_cache_expiration
ENKRYPT_GATEWAY_CACHE_EXPIRATION = cache_service.gateway_cache_expiration
ENKRYPT_MCP_USE_EXTERNAL_CACHE = cache_service.use_external_cache