Skip to main content
Glama
instance_manager.py7.21 kB
"""HippoRAG instance manager with TTL functionality.""" import threading import time import logging import os from datetime import datetime, timedelta from typing import Dict, Optional, Set from hipporag import HippoRAG from hipporag.utils.config_utils import BaseConfig from .config import get_config logger = logging.getLogger(__name__) class HippoRAGInstanceManager: """ Manager for HippoRAG instances with TTL-based cleanup. This class manages HippoRAG instances and automatically offloads instances that haven't been accessed for a specified period of time. """ def __init__(self): """Initialize the instance manager.""" self._instances: Dict[str, HippoRAG] = {} self._last_accessed: Dict[str, datetime] = {} self._lock = threading.RLock() self._cleanup_thread = threading.Thread( target=self._cleanup_loop, daemon=True, name="hipporag-instance-cleanup" ) self._running = True self._cleanup_thread.start() logger.info("HippoRAG instance manager initialized with TTL cleanup") # Get configuration self._config = get_config() # Ensure memory directory exists os.makedirs(self._config.memory_dir, exist_ok=True) async def get(self, session_id: str) -> HippoRAG: """ Get a HippoRAG instance for the given session ID. If the instance doesn't exist, it will be created. Args: session_id: The session ID Returns: The HippoRAG instance """ with self._lock: instance = self._instances.get(session_id) if instance: # Update last access time self._last_accessed[session_id] = datetime.now() logger.debug(f"Accessed HippoRAG instance for session {session_id}") else: # Create a new instance if it doesn't exist await self.create_memory(session_id) instance = self._instances.get(session_id) return instance def add(self, session_id: str, instance: HippoRAG) -> None: """ Add a HippoRAG instance for the given session ID. Args: session_id: The session ID instance: The HippoRAG instance """ with self._lock: self._instances[session_id] = instance self._last_accessed[session_id] = datetime.now() logger.info(f"Added HippoRAG instance for session {session_id}") def remove(self, session_id: str) -> None: """ Remove a HippoRAG instance for the given session ID. Args: session_id: The session ID """ with self._lock: if session_id in self._instances: del self._instances[session_id] del self._last_accessed[session_id] logger.info(f"Removed HippoRAG instance for session {session_id}") def contains(self, session_id: str) -> bool: """ Check if a HippoRAG instance exists for the given session ID. Args: session_id: The session ID Returns: True if the instance exists, False otherwise """ with self._lock: return session_id in self._instances def get_all_session_ids(self) -> Set[str]: """ Get all session IDs with active instances. Returns: A set of session IDs """ with self._lock: return set(self._instances.keys()) def _cleanup_loop(self) -> None: """Background thread that periodically cleans up expired instances.""" while self._running: try: self._cleanup_expired_instances() except Exception as e: logger.error(f"Error in instance cleanup: {str(e)}") # Sleep for a minute before checking again time.sleep(60) def _cleanup_expired_instances(self) -> None: """Clean up instances that have expired based on TTL.""" config = get_config() ttl_minutes = config.instance_ttl_minutes if ttl_minutes <= 0: # TTL is disabled return cutoff_time = datetime.now() - timedelta(minutes=ttl_minutes) expired_sessions = [] with self._lock: # Find expired sessions for session_id, last_accessed in self._last_accessed.items(): if last_accessed < cutoff_time: expired_sessions.append(session_id) # Remove expired sessions for session_id in expired_sessions: del self._instances[session_id] del self._last_accessed[session_id] logger.info(f"Offloaded inactive HippoRAG instance for session {session_id} (TTL expired)") if expired_sessions: logger.info(f"Cleaned up {len(expired_sessions)} inactive HippoRAG instances") def shutdown(self) -> None: """Shutdown the instance manager and stop the cleanup thread.""" self._running = False if self._cleanup_thread.is_alive(): self._cleanup_thread.join(timeout=2.0) logger.info("HippoRAG instance manager shutdown") def _get_session_path(self, session_id: str) -> str: """Get the path for storing session data.""" return os.path.join(self._config.memory_dir, f"session_{session_id}") def _ensure_session_exists(self, session_id: str) -> None: """Ensure that a session directory exists.""" session_path = self._get_session_path(session_id) os.makedirs(session_path, exist_ok=True) def _initialize_hipporag(self, session_id: str) -> HippoRAG: """Initialize HippoRAG for a session.""" session_path = self._get_session_path(session_id) hippo_config = BaseConfig() hippo_config.save_dir = session_path # Apply HippoRAG configuration from our config for key, value in self._config.hipporag_config.items(): if hasattr(hippo_config, key) and value is not None: setattr(hippo_config, key, value) return HippoRAG( global_config=hippo_config, save_dir=session_path ) async def create_memory(self, session_id: str) -> None: """Create a new memory for a given chat session. Args: session_id: Unique identifier for the chat session. """ # Check if session already exists if not self.contains(session_id): # Create session directory self._ensure_session_exists(session_id) # Initialize HippoRAG for knowledge graph hipporag_instance = self._initialize_hipporag(session_id) # Store HippoRAG instance in the manager self.add(session_id, hipporag_instance)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ddkang1/mcp-mem'

If you have feedback or need assistance with the MCP directory API, please join our Discord server