Skip to main content
Glama
cache_manager.py17 kB
""" Module de gestion du cache L1 mémoire et du circuit breaker pour optimiser les performances du bridge HTTP-MCP. """ import asyncio import time from typing import Any, Dict, Optional, Tuple from collections import OrderedDict from dataclasses import dataclass from enum import Enum import logging import json from datetime import datetime, timedelta logger = logging.getLogger(__name__) class CircuitState(Enum): """États du circuit breaker""" CLOSED = "closed" # Fonctionnement normal OPEN = "open" # Circuit ouvert (erreurs détectées) HALF_OPEN = "half_open" # Test de récupération @dataclass class CacheEntry: """Entrée du cache avec métadonnées""" value: Any timestamp: float ttl: float hit_count: int = 0 @property def is_expired(self) -> bool: """Vérifie si l'entrée est expirée""" return time.time() > self.timestamp + self.ttl @property def age_seconds(self) -> float: """Retourne l'âge en secondes""" return time.time() - self.timestamp @dataclass class CircuitBreakerConfig: """Configuration du circuit breaker""" failure_threshold: int = 5 # Seuil d'erreurs avant ouverture recovery_timeout: float = 30.0 # Timeout avant test de récupération success_threshold: int = 3 # Succès requis pour fermeture timeout: float = 10.0 # Timeout des requêtes @dataclass class CircuitBreakerStats: """Statistiques du circuit breaker""" state: CircuitState failure_count: int success_count: int last_failure_time: Optional[float] last_success_time: Optional[float] state_change_time: float total_requests: int successful_requests: int class LRUCache: """Cache LRU (Least Recently Used) thread-safe""" def __init__(self, max_size: int = 1000, default_ttl: float = 300.0): """ Initialise le cache LRU Args: max_size: Taille maximale du cache default_ttl: TTL par défaut en secondes (5 minutes) """ self.max_size = max_size self.default_ttl = default_ttl self._cache: OrderedDict[str, CacheEntry] = OrderedDict() self._lock = asyncio.Lock() self._stats = { 'hits': 0, 'misses': 0, 'evictions': 0, 'expired': 0, 'size': 0 } async def get(self, key: str) -> Optional[Any]: """ Récupère une valeur du cache Args: key: Clé de l'entrée Returns: Valeur si trouvée et non expirée, None sinon """ async with self._lock: if key not in self._cache: self._stats['misses'] += 1 return None entry = self._cache[key] # Vérifier l'expiration if entry.is_expired: del self._cache[key] self._stats['expired'] += 1 self._stats['misses'] += 1 self._stats['size'] = len(self._cache) return None # Déplacer en fin (plus récent) self._cache.move_to_end(key) entry.hit_count += 1 self._stats['hits'] += 1 logger.debug(f"Cache HIT: {key} (age: {entry.age_seconds:.1f}s, hits: {entry.hit_count})") return entry.value async def set(self, key: str, value: Any, ttl: Optional[float] = None) -> None: """ Ajoute une valeur au cache Args: key: Clé de l'entrée value: Valeur à stocker ttl: Durée de vie en secondes (utilise default_ttl si None) """ async with self._lock: ttl = ttl or self.default_ttl # Créer nouvelle entrée entry = CacheEntry( value=value, timestamp=time.time(), ttl=ttl ) # Ajouter/Mettre à jour if key in self._cache: self._cache[key] = entry self._cache.move_to_end(key) else: self._cache[key] = entry self._stats['size'] = len(self._cache) # Éviction si nécessaire if len(self._cache) > self.max_size: oldest_key = next(iter(self._cache)) del self._cache[oldest_key] self._stats['evictions'] += 1 self._stats['size'] = len(self._cache) logger.debug(f"Cache SET: {key} (ttl: {ttl}s)") async def delete(self, key: str) -> bool: """ Supprime une entrée du cache Args: key: Clé à supprimer Returns: True si supprimée, False si non trouvée """ async with self._lock: if key in self._cache: del self._cache[key] self._stats['size'] = len(self._cache) logger.debug(f"Cache DELETE: {key}") return True return False async def clear(self) -> None: """Vide le cache""" async with self._lock: cleared_count = len(self._cache) self._cache.clear() self._stats['size'] = 0 logger.info(f"Cache cleared: {cleared_count} entries removed") async def cleanup_expired(self) -> int: """ Nettoie les entrées expirées Returns: Nombre d'entrées supprimées """ async with self._lock: expired_keys = [ key for key, entry in self._cache.items() if entry.is_expired ] for key in expired_keys: del self._cache[key] if expired_keys: self._stats['expired'] += len(expired_keys) self._stats['size'] = len(self._cache) logger.debug(f"Cache cleanup: {len(expired_keys)} expired entries removed") return len(expired_keys) def get_stats(self) -> Dict[str, Any]: """Retourne les statistiques du cache""" total_requests = self._stats['hits'] + self._stats['misses'] hit_rate = (self._stats['hits'] / total_requests * 100) if total_requests > 0 else 0 return { **self._stats, 'total_requests': total_requests, 'hit_rate_percent': round(hit_rate, 2), 'max_size': self.max_size, 'default_ttl': self.default_ttl } class CircuitBreaker: """Circuit breaker pour protéger contre les pannes externes""" def __init__(self, config: CircuitBreakerConfig): """ Initialise le circuit breaker Args: config: Configuration du circuit breaker """ self.config = config self._state = CircuitState.CLOSED self._failure_count = 0 self._success_count = 0 self._last_failure_time: Optional[float] = None self._last_success_time: Optional[float] = None self._state_change_time = time.time() self._total_requests = 0 self._successful_requests = 0 self._lock = asyncio.Lock() async def call(self, coro_func, *args, **kwargs) -> Any: """ Exécute une fonction asynchrone protégée par le circuit breaker Args: coro_func: Fonction coroutine à exécuter *args, **kwargs: Arguments de la fonction Returns: Résultat de la fonction Raises: CircuitBreakerOpenError: Si le circuit est ouvert TimeoutError: Si timeout dépassé Exception: Erreur de la fonction appelée """ async with self._lock: self._total_requests += 1 # Vérifier l'état du circuit if self._state == CircuitState.OPEN: # Vérifier si on peut passer en half-open if (self._last_failure_time and time.time() - self._last_failure_time >= self.config.recovery_timeout): logger.info("Circuit breaker: Transition OPEN -> HALF_OPEN") self._state = CircuitState.HALF_OPEN self._state_change_time = time.time() else: raise CircuitBreakerOpenError("Circuit breaker is OPEN") try: # Exécuter avec timeout result = await asyncio.wait_for( coro_func(*args, **kwargs), timeout=self.config.timeout ) await self._on_success() return result except Exception as e: await self._on_failure() raise async def _on_success(self) -> None: """Traite un succès""" async with self._lock: self._successful_requests += 1 self._last_success_time = time.time() if self._state == CircuitState.HALF_OPEN: self._success_count += 1 if self._success_count >= self.config.success_threshold: logger.info("Circuit breaker: Transition HALF_OPEN -> CLOSED") self._state = CircuitState.CLOSED self._failure_count = 0 self._success_count = 0 self._state_change_time = time.time() elif self._state == CircuitState.CLOSED: # Reset failure count on success self._failure_count = 0 async def _on_failure(self) -> None: """Traite un échec""" async with self._lock: self._failure_count += 1 self._last_failure_time = time.time() if self._state == CircuitState.CLOSED: if self._failure_count >= self.config.failure_threshold: logger.warning(f"Circuit breaker: Transition CLOSED -> OPEN after {self._failure_count} failures") self._state = CircuitState.OPEN self._state_change_time = time.time() elif self._state == CircuitState.HALF_OPEN: logger.warning("Circuit breaker: Transition HALF_OPEN -> OPEN") self._state = CircuitState.OPEN self._success_count = 0 self._state_change_time = time.time() def get_stats(self) -> CircuitBreakerStats: """Retourne les statistiques du circuit breaker""" return CircuitBreakerStats( state=self._state, failure_count=self._failure_count, success_count=self._success_count, last_failure_time=self._last_failure_time, last_success_time=self._last_success_time, state_change_time=self._state_change_time, total_requests=self._total_requests, successful_requests=self._successful_requests ) @property def state(self) -> CircuitState: """État actuel du circuit""" return self._state @property def is_available(self) -> bool: """Indique si le circuit est disponible""" if self._state == CircuitState.OPEN: # Vérifier si on peut passer en half-open if (self._last_failure_time and time.time() - self._last_failure_time >= self.config.recovery_timeout): return True return False return True class CircuitBreakerOpenError(Exception): """Exception levée quand le circuit breaker est ouvert""" pass class CacheManager: """Gestionnaire central du cache et du circuit breaker""" def __init__(self, cache_size: int = 1000, cache_ttl: float = 300.0, circuit_config: Optional[CircuitBreakerConfig] = None): """ Initialise le gestionnaire de cache Args: cache_size: Taille maximale du cache cache_ttl: TTL par défaut du cache circuit_config: Configuration du circuit breaker """ self.tools_cache = LRUCache(cache_size, cache_ttl) self.response_cache = LRUCache(cache_size // 2, cache_ttl // 2) # TTL plus court pour réponses circuit_config = circuit_config or CircuitBreakerConfig() self.circuit_breaker = CircuitBreaker(circuit_config) # Métriques globales self._start_time = time.time() self._metrics = { 'cache_operations': 0, 'circuit_breaker_calls': 0, 'total_errors': 0 } logger.info("CacheManager initialized") async def get_tools_cached(self, key: str) -> Optional[Any]: """Récupère les outils depuis le cache""" self._metrics['cache_operations'] += 1 return await self.tools_cache.get(key) async def set_tools_cached(self, key: str, tools: Any, ttl: Optional[float] = None) -> None: """Met en cache la liste des outils""" await self.tools_cache.set(key, tools, ttl) async def get_response_cached(self, key: str) -> Optional[Any]: """Récupère une réponse depuis le cache""" self._metrics['cache_operations'] += 1 return await self.response_cache.get(key) async def set_response_cached(self, key: str, response: Any, ttl: Optional[float] = None) -> None: """Met en cache une réponse""" await self.response_cache.set(key, response, ttl) async def protected_call(self, coro_func, *args, **kwargs) -> Any: """ Appel protégé par le circuit breaker Args: coro_func: Fonction coroutine à protéger *args, **kwargs: Arguments de la fonction Returns: Résultat de la fonction """ self._metrics['circuit_breaker_calls'] += 1 try: return await self.circuit_breaker.call(coro_func, *args, **kwargs) except Exception as e: self._metrics['total_errors'] += 1 raise async def cleanup(self) -> Dict[str, int]: """ Nettoie les caches expirés Returns: Statistiques de nettoyage """ tools_cleaned = await self.tools_cache.cleanup_expired() responses_cleaned = await self.response_cache.cleanup_expired() total_cleaned = tools_cleaned + responses_cleaned if total_cleaned > 0: logger.info(f"Cache cleanup: {total_cleaned} entries removed") return { 'tools_cleaned': tools_cleaned, 'responses_cleaned': responses_cleaned, 'total_cleaned': total_cleaned } async def clear_all_caches(self) -> None: """Vide tous les caches""" await self.tools_cache.clear() await self.response_cache.clear() logger.info("All caches cleared") def get_metrics(self) -> Dict[str, Any]: """Retourne les métriques complètes""" uptime = time.time() - self._start_time tools_stats = self.tools_cache.get_stats() response_stats = self.response_cache.get_stats() circuit_stats = self.circuit_breaker.get_stats() return { 'uptime_seconds': round(uptime, 2), 'global_metrics': self._metrics, 'tools_cache': tools_stats, 'response_cache': response_stats, 'circuit_breaker': { 'state': circuit_stats.state.value, 'failure_count': circuit_stats.failure_count, 'success_count': circuit_stats.success_count, 'total_requests': circuit_stats.total_requests, 'successful_requests': circuit_stats.successful_requests, 'success_rate_percent': round( (circuit_stats.successful_requests / circuit_stats.total_requests * 100) if circuit_stats.total_requests > 0 else 0, 2 ), 'last_failure': datetime.fromtimestamp(circuit_stats.last_failure_time).isoformat() if circuit_stats.last_failure_time else None, 'last_success': datetime.fromtimestamp(circuit_stats.last_success_time).isoformat() if circuit_stats.last_success_time else None, 'state_duration_seconds': round(time.time() - circuit_stats.state_change_time, 2), 'is_available': self.circuit_breaker.is_available } } # Instance globale du gestionnaire de cache cache_manager = CacheManager()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jonathan97480/McpHomeAssistant'

If you have feedback or need assistance with the MCP directory API, please join our Discord server