Skip to main content
Glama

Dynamic Per-User Tool Generation MCP Server

hierarchy_cache.py30.8 kB
""" Hierarchical Entity Cache System Provides in-memory tree caches for hierarchical entities (locations, departments) with fast lookups and path generation capabilities. Based on the OptimizedLocationCache pattern from treiTree.py but generalized to support multiple entity types. """ import sys import bisect import logging from dataclasses import dataclass from typing import Dict, List, Optional, Tuple, Union from collections import defaultdict logger = logging.getLogger(__name__) @dataclass class HierarchicalNode: """ Lightweight hierarchical node with minimal memory footprint. Represents a single entity in a hierarchy (location, department, etc.) """ name: str id: Union[str, int] parent_id: Optional[Union[str, int]] = None def __post_init__(self): # Intern strings to reduce memory usage for duplicate names if isinstance(self.name, str): self.name = sys.intern(self.name) if isinstance(self.id, str): self.id = sys.intern(self.id) if isinstance(self.parent_id, str): self.parent_id = sys.intern(self.parent_id) def __repr__(self): return f"HierarchicalNode(name='{self.name}', id='{self.id}')" class HierarchicalCache: """ High-performance hierarchical entity cache. Features: - O(1) ID lookups via HashMap - O(log N) name searches via sorted arrays + binary search - O(1) path lookups via pre-computed cache - Handles deep hierarchies efficiently - Thread-safe for read operations after construction """ def __init__(self, entity_type: str = "entity"): """ Initialize hierarchical cache. Args: entity_type: Type of entity being cached (for logging/debugging) """ self.entity_type = entity_type # Primary storage: O(1) ID lookups self.nodes_by_id: Dict[Union[str, int], HierarchicalNode] = {} # Sorted indices for binary search: O(log N) lookups self.names_sorted: List[Tuple[str, Union[str, int]]] = [] # (name, id) self.names_lower_sorted: List[Tuple[str, Union[str, int]]] = [] # (name.lower(), id) # Name-to-IDs mapping for handling duplicates self.name_to_ids: Dict[str, List[Union[str, int]]] = defaultdict(list) # Pre-computed paths cache: O(1) path lookups self.paths_cache: Dict[Union[str, int], str] = {} self.paths_name_cache: Dict[Union[str, int], str] = {} # Children mapping for hierarchy traversal self.children_by_parent: Dict[Union[str, int], List[Union[str, int]]] = defaultdict(list) # Build state self._indices_built = False self._paths_computed = False def add_node(self, name: str, id_: Union[str, int], parent_id: Optional[Union[str, int]] = None): """ Add a node to the cache. Args: name: Entity name id_: Unique entity identifier parent_id: Parent entity ID (None for root nodes) """ # Create and store node node = HierarchicalNode(name, id_, parent_id) self.nodes_by_id[id_] = node # Track children for hierarchy if parent_id is not None: self.children_by_parent[parent_id].append(id_) # Track name duplicates self.name_to_ids[name.lower()].append(id_) # Mark indices as needing rebuild self._indices_built = False self._paths_computed = False def update_node(self, id_: Union[str, int], name: str, parent_id: Optional[Union[str, int]] = None) -> bool: """ Update an existing node in the cache, or add it if it doesn't exist (upsert). This method replaces an existing node's name and parent_id while maintaining cache integrity. If the node doesn't exist, it will be added to the cache. All related indices and tracking structures are updated. Args: id_: Unique entity identifier of the node to update/add name: New entity name parent_id: New parent entity ID (None for root nodes) Returns: True if node was updated/added successfully Example: >>> cache.update_node(123, "New York Office", parent_id=100) True """ # Check if node exists - if not, add it if id_ not in self.nodes_by_id: logger.debug(f"Node {id_} not found in cache, adding it") self.add_node(name, id_, parent_id) return True old_node = self.nodes_by_id[id_] old_name = old_node.name old_parent_id = old_node.parent_id # Remove old name from name tracking if old_name.lower() in self.name_to_ids: try: self.name_to_ids[old_name.lower()].remove(id_) # Clean up empty lists if not self.name_to_ids[old_name.lower()]: del self.name_to_ids[old_name.lower()] except ValueError: logger.warning(f"Node {id_} not found in name_to_ids for '{old_name}'") # Remove from old parent's children list if old_parent_id is not None and old_parent_id in self.children_by_parent: try: self.children_by_parent[old_parent_id].remove(id_) # Clean up empty lists if not self.children_by_parent[old_parent_id]: del self.children_by_parent[old_parent_id] except ValueError: logger.warning(f"Node {id_} not found in children list of parent {old_parent_id}") # Create updated node updated_node = HierarchicalNode(name, id_, parent_id) self.nodes_by_id[id_] = updated_node # Add to new parent's children list if parent_id is not None: self.children_by_parent[parent_id].append(id_) # Track new name self.name_to_ids[name.lower()].append(id_) # Mark indices as needing rebuild self._indices_built = False self._paths_computed = False logger.debug( f"Updated {self.entity_type} node {id_}: " f"name '{old_name}' -> '{name}', " f"parent {old_parent_id} -> {parent_id}" ) return True def remove_node(self, id_: Union[str, int]) -> bool: """ Remove a node from the cache by its ID. This method removes a node and updates all related indices and tracking structures. If the node has children, they will be orphaned (parent_id set to None). Args: id_: Unique entity identifier of the node to remove Returns: True if node was removed successfully, False if node doesn't exist Example: >>> cache.remove_node(123) True """ # Check if node exists if id_ not in self.nodes_by_id: logger.warning(f"Cannot remove node {id_}: node not found in cache") return False node = self.nodes_by_id[id_] node_name = node.name node_parent_id = node.parent_id # Handle children - orphan them by setting their parent_id to None if id_ in self.children_by_parent: child_ids = self.children_by_parent[id_].copy() for child_id in child_ids: if child_id in self.nodes_by_id: child_node = self.nodes_by_id[child_id] # Create new node with parent_id = None orphaned_node = HierarchicalNode(child_node.name, child_node.id, None) self.nodes_by_id[child_id] = orphaned_node # Remove the children mapping del self.children_by_parent[id_] logger.warning(f"Orphaned {len(child_ids)} children of node {id_}") # Remove from parent's children list if node_parent_id is not None and node_parent_id in self.children_by_parent: try: self.children_by_parent[node_parent_id].remove(id_) # Clean up empty lists if not self.children_by_parent[node_parent_id]: del self.children_by_parent[node_parent_id] except ValueError: logger.warning(f"Node {id_} not found in children list of parent {node_parent_id}") # Remove from name tracking if node_name.lower() in self.name_to_ids: try: self.name_to_ids[node_name.lower()].remove(id_) # Clean up empty lists if not self.name_to_ids[node_name.lower()]: del self.name_to_ids[node_name.lower()] except ValueError: logger.warning(f"Node {id_} not found in name_to_ids for '{node_name}'") # Remove from paths caches self.paths_cache.pop(id_, None) self.paths_name_cache.pop(id_, None) # Remove from primary storage del self.nodes_by_id[id_] # Mark indices as needing rebuild self._indices_built = False self._paths_computed = False logger.debug(f"Removed {self.entity_type} node {id_} ('{node_name}')") return True def _build_indices(self): """Build sorted indices for fast searching. Call after all nodes added.""" if self._indices_built: return # Build sorted name indices for binary search name_id_pairs = [(node.name, node.id) for node in self.nodes_by_id.values()] self.names_sorted = sorted(name_id_pairs, key=lambda x: x[0]) # Build lowercase sorted index with lowercase names name_lower_id_pairs = [(node.name.lower(), node.id) for node in self.nodes_by_id.values()] self.names_lower_sorted = sorted(name_lower_id_pairs, key=lambda x: x[0]) self._indices_built = True def _compute_paths(self): """Pre-compute all entity paths for O(1) access.""" if self._paths_computed: return self._build_indices() # Ensure indices are built first # Compute paths for all nodes for node_id, node in self.nodes_by_id.items(): # Build ID path id_path = self._build_id_path(node_id) self.paths_cache[node_id] = " -> ".join(str(id_) for id_ in id_path) # Build name path name_path = self._build_name_path(node_id) self.paths_name_cache[node_id] = " -> ".join(name_path) self._paths_computed = True def _build_id_path(self, node_id: Union[str, int]) -> List[Union[str, int]]: """Build path of IDs from root to given node.""" path = [] current_id = node_id visited = set() # Prevent infinite loops while current_id is not None and current_id in self.nodes_by_id: if current_id in visited: logger.warning(f"Circular reference detected in {self.entity_type} hierarchy at node {current_id}") break # Circular reference detected visited.add(current_id) path.append(current_id) current_id = self.nodes_by_id[current_id].parent_id return list(reversed(path)) def _build_name_path(self, node_id: Union[str, int]) -> List[str]: """Build path of names from root to given node.""" id_path = self._build_id_path(node_id) return [self.nodes_by_id[id_].name for id_ in id_path] def search_by_name_exact(self, name: str) -> List[HierarchicalNode]: """ Find nodes whose name exactly matches. Args: name: Exact name to search for Returns: List of matching HierarchicalNode objects """ self._build_indices() # Handle case-insensitive search matching_ids = self.name_to_ids.get(name.lower(), []) return [self.nodes_by_id[id_] for id_ in matching_ids] def search_by_name_prefix(self, prefix: str) -> List[HierarchicalNode]: """ Find nodes whose name starts with given prefix. Args: prefix: Name prefix to search for Returns: List of matching HierarchicalNode objects """ self._build_indices() prefix_lower = prefix.lower() results = [] # Binary search for first match start_idx = bisect.bisect_left(self.names_lower_sorted, (prefix_lower, "")) # Collect all matches for i in range(start_idx, len(self.names_lower_sorted)): name_lower, node_id = self.names_lower_sorted[i] if not name_lower.startswith(prefix_lower): break results.append(self.nodes_by_id[node_id]) return results def get_full_path(self, node_id: Union[str, int]) -> str: """ Return the full path (IDs) of a given node. Args: node_id: Node identifier Returns: Path string with IDs separated by " -> " """ self._compute_paths() return self.paths_cache.get(node_id, "") def get_full_path_name(self, identifier: Union[str, int]) -> Union[str, List[str]]: """ Return full path (names) given either node_id or name. Args: identifier: Either a node ID or an entity name Returns: Path string with names, or list of paths if multiple matches """ self._compute_paths() # Try as node ID first if identifier in self.nodes_by_id: return self.paths_name_cache.get(identifier, "") # Try as name matching_nodes = self.search_by_name_exact(str(identifier)) if not matching_nodes: return "" if len(matching_nodes) == 1: return self.paths_name_cache.get(matching_nodes[0].id, "") # Multiple matches - return all paths return [self.paths_name_cache.get(node.id, "") for node in matching_nodes] def get_children(self, node_id: Union[str, int]) -> List[HierarchicalNode]: """ Get direct children of a node. Args: node_id: Parent node identifier Returns: List of child HierarchicalNode objects """ child_ids = self.children_by_parent.get(node_id, []) return [self.nodes_by_id[child_id] for child_id in child_ids] def get_stats(self) -> Dict[str, Union[int, float]]: """ Get cache statistics. Returns: Dictionary with cache statistics """ total_nodes = len(self.nodes_by_id) root_nodes = sum(1 for node in self.nodes_by_id.values() if node.parent_id is None) # Calculate depth statistics depths = [] for node_id in self.nodes_by_id: depth = len(self._build_id_path(node_id)) - 1 depths.append(depth) return { 'entity_type': self.entity_type, 'total_nodes': total_nodes, 'root_nodes': root_nodes, 'max_depth': max(depths) if depths else 0, 'avg_depth': sum(depths) / len(depths) if depths else 0, 'indices_built': self._indices_built, 'paths_computed': self._paths_computed } # ------------------------------- # Elasticsearch Data Loaders # ------------------------------- class BaseEntityLoader: """ Base class for loading hierarchical entities from Elasticsearch. Handles common Elasticsearch operations and provides template for entity-specific field mapping. """ def __init__(self, tenant_id: str, entity_type: str, index_suffix: str): """ Initialize entity loader. Args: tenant_id: Tenant identifier for index naming entity_type: Type of entity (for logging) index_suffix: Index suffix (e.g., "location", "department") """ self.tenant_id = tenant_id self.entity_type = entity_type self.index_name = f"{tenant_id}_{index_suffix}" self.es_client = None def connect(self) -> bool: """Connect to Elasticsearch using existing client infrastructure.""" try: from elasticsearch_client import get_elasticsearch_client es_wrapper = get_elasticsearch_client() if es_wrapper.connect(): self.es_client = es_wrapper.get_client() logger.info(f"Connected to Elasticsearch for {self.entity_type} loading") return True else: logger.error(f"Failed to connect to Elasticsearch for {self.entity_type}") return False except ImportError as e: logger.error(f"elasticsearch_client module not found: {e}") return False except Exception as e: logger.error(f"Failed to connect to Elasticsearch: {e}", exc_info=True) return False def load_entities(self, cache: HierarchicalCache) -> bool: """ Load all entities from Elasticsearch into the cache. Args: cache: HierarchicalCache instance to populate Returns: True if successful, False otherwise """ if not self.es_client: logger.error(f"Not connected to Elasticsearch for {self.entity_type}") return False try: # Check if index exists if not self.es_client.indices.exists(index=self.index_name): logger.warning(f"Index '{self.index_name}' does not exist") return False # Get all entities using scroll API for large datasets query = { "query": {"match_all": {}}, "size": 1000 # Process in batches } response = self.es_client.search( index=self.index_name, body=query, scroll='2m' ) scroll_id = response['_scroll_id'] hits = response['hits']['hits'] total_loaded = 0 # Process first batch total_loaded += self._process_batch(cache, hits) # Process remaining batches while len(hits) > 0: response = self.es_client.scroll(scroll_id=scroll_id, scroll='2m') hits = response['hits']['hits'] total_loaded += self._process_batch(cache, hits) # Clear scroll self.es_client.clear_scroll(scroll_id=scroll_id) logger.info(f"✅ Loaded {total_loaded} {self.entity_type} entities from Elasticsearch") return True except Exception as e: logger.error(f"Error loading {self.entity_type} entities: {e}", exc_info=True) return False def _process_batch(self, cache: HierarchicalCache, hits: list) -> int: """ Process a batch of Elasticsearch hits. Must be implemented by subclasses to handle entity-specific field mapping. Args: cache: HierarchicalCache to populate hits: List of Elasticsearch hit documents Returns: Number of entities successfully loaded """ raise NotImplementedError("Subclasses must implement _process_batch") class LocationCacheLoader(BaseEntityLoader): """Loads location entities from Elasticsearch into HierarchicalCache.""" def __init__(self, tenant_id: str = "apolo"): """ Initialize location loader. Args: tenant_id: Tenant identifier for index naming """ super().__init__(tenant_id, "location", "location") def _process_batch(self, cache: HierarchicalCache, hits: list) -> int: """Process a batch of location documents.""" loaded_count = 0 for hit in hits: try: source = hit['_source'] # Extract location data based on ES schema location_id = source.get('dbid') location_name = source.get('location_name', '') parent_id = source.get('location_parentid') # Validate required fields if location_id is None or not location_name: logger.warning(f"Skipping location with missing required fields: {hit.get('_id', 'unknown')}") continue # Convert parent_id = 0 to None (root nodes) if parent_id == 0: parent_id = None # Add to cache cache.add_node(location_name, location_id, parent_id) loaded_count += 1 except Exception as e: logger.warning(f"Error processing location {hit.get('_id', 'unknown')}: {e}") continue return loaded_count class DepartmentCacheLoader(BaseEntityLoader): """Loads department entities from Elasticsearch into HierarchicalCache.""" def __init__(self, tenant_id: str = "apolo"): """ Initialize department loader. Args: tenant_id: Tenant identifier for index naming """ super().__init__(tenant_id, "department", "department") def _process_batch(self, cache: HierarchicalCache, hits: list) -> int: """Process a batch of department documents.""" loaded_count = 0 for hit in hits: try: source = hit['_source'] # Extract department data based on ES schema department_id = source.get('dbid') department_name = source.get('department_name', '') parent_id = source.get('department_parentid') # Validate required fields if department_id is None or not department_name: logger.warning(f"Skipping department with missing required fields: {hit.get('_id', 'unknown')}") continue # Convert parent_id = 0 to None (root nodes) if parent_id == 0: parent_id = None # Add to cache cache.add_node(department_name, department_id, parent_id) loaded_count += 1 except Exception as e: logger.warning(f"Error processing department {hit.get('_id', 'unknown')}: {e}") continue return loaded_count # ------------------------------- # Hierarchy Cache Manager # ------------------------------- class HierarchyCacheManager: """ Manages multiple hierarchical entity caches (locations, departments). Provides a unified interface for accessing and managing separate tree structures for different entity types. Implements singleton pattern to ensure single initialization. """ _instance: Optional['HierarchyCacheManager'] = None _initialized: bool = False def __init__(self): """Initialize cache manager (use get_instance() instead).""" self.location_cache: Optional[HierarchicalCache] = None self.department_cache: Optional[HierarchicalCache] = None self._tenant_id: Optional[str] = None @classmethod def get_instance(cls) -> 'HierarchyCacheManager': """ Get singleton instance of cache manager. Returns: HierarchyCacheManager instance """ if cls._instance is None: cls._instance = cls() return cls._instance def initialize(self, tenant_id: str) -> bool: """ Initialize both location and department caches. Args: tenant_id: Tenant identifier for index naming Returns: True if at least one cache was successfully initialized """ if self._initialized: logger.info("Hierarchy caches already initialized") return True self._tenant_id = tenant_id logger.info(f"Initializing hierarchy caches for tenant '{tenant_id}'...") location_success = self._initialize_location_cache(tenant_id) department_success = self._initialize_department_cache(tenant_id) # Mark as initialized if at least one succeeded if location_success or department_success: self._initialized = True logger.info("✅ Hierarchy cache initialization complete") self._log_statistics() return True else: logger.error("❌ Failed to initialize any hierarchy caches") return False def _initialize_location_cache(self, tenant_id: str) -> bool: """Initialize location cache.""" try: logger.info("Loading location hierarchy...") # Create cache self.location_cache = HierarchicalCache(entity_type="location") # Load data from Elasticsearch loader = LocationCacheLoader(tenant_id) if not loader.connect(): logger.error("Failed to connect to Elasticsearch for locations") return False if not loader.load_entities(self.location_cache): logger.error("Failed to load location data") return False # Build indices and compute paths self.location_cache._build_indices() self.location_cache._compute_paths() logger.info("✅ Location cache initialized successfully") return True except Exception as e: logger.error(f"Error initializing location cache: {e}", exc_info=True) self.location_cache = None return False def _initialize_department_cache(self, tenant_id: str) -> bool: """Initialize department cache.""" try: logger.info("Loading department hierarchy...") # Create cache self.department_cache = HierarchicalCache(entity_type="department") # Load data from Elasticsearch loader = DepartmentCacheLoader(tenant_id) if not loader.connect(): logger.error("Failed to connect to Elasticsearch for departments") return False if not loader.load_entities(self.department_cache): logger.error("Failed to load department data") return False # Build indices and compute paths self.department_cache._build_indices() self.department_cache._compute_paths() logger.info("✅ Department cache initialized successfully") return True except Exception as e: logger.error(f"Error initializing department cache: {e}", exc_info=True) self.department_cache = None return False def _log_statistics(self): """Log statistics for all initialized caches.""" if self.location_cache: stats = self.location_cache.get_stats() logger.info( f"📊 Location Cache: {stats['total_nodes']} nodes, " f"{stats['root_nodes']} roots, max depth {stats['max_depth']}" ) if self.department_cache: stats = self.department_cache.get_stats() logger.info( f"📊 Department Cache: {stats['total_nodes']} nodes, " f"{stats['root_nodes']} roots, max depth {stats['max_depth']}" ) def get_location_cache(self) -> Optional[HierarchicalCache]: """ Get location cache instance. Returns: HierarchicalCache for locations or None if not initialized """ return self.location_cache def get_department_cache(self) -> Optional[HierarchicalCache]: """ Get department cache instance. Returns: HierarchicalCache for departments or None if not initialized """ return self.department_cache def is_initialized(self) -> bool: """Check if caches have been initialized.""" return self._initialized def get_statistics(self) -> Dict[str, any]: """ Get statistics for all caches. Returns: Dictionary with statistics for each cache """ stats = { 'tenant_id': self._tenant_id, 'initialized': self._initialized, 'location': self.location_cache.get_stats() if self.location_cache else None, 'department': self.department_cache.get_stats() if self.department_cache else None, } return stats # ------------------------------- # Public API # ------------------------------- def initialize_hierarchy_caches(tenant_id: str = "apolo") -> Optional[HierarchyCacheManager]: """ Initialize hierarchical entity caches for locations and departments. This is the main entry point for setting up the cache system. It should be called once during server startup. Args: tenant_id: Tenant identifier for index naming (default: "apolo") Returns: HierarchyCacheManager instance if successful, None if completely failed Example: >>> # During server startup >>> cache_manager = initialize_hierarchy_caches("apolo") >>> if cache_manager: ... location_cache = cache_manager.get_location_cache() ... dept_cache = cache_manager.get_department_cache() """ try: logger.info("=" * 60) logger.info("🚀 Initializing Hierarchy Cache System") logger.info("=" * 60) # Get singleton manager instance manager = HierarchyCacheManager.get_instance() # Initialize caches success = manager.initialize(tenant_id) if success: logger.info("=" * 60) logger.info("✅ Hierarchy Cache System Ready") logger.info("=" * 60) return manager else: logger.warning("⚠️ Hierarchy cache initialization failed, but server can continue") return manager # Return manager even if initialization failed except Exception as e: logger.error(f"❌ Critical error during hierarchy cache initialization: {e}", exc_info=True) logger.warning("Server will continue without hierarchy caches") return None def get_hierarchy_cache_manager() -> Optional[HierarchyCacheManager]: """ Get the singleton hierarchy cache manager instance. Returns: HierarchyCacheManager instance or None if not initialized Example: >>> manager = get_hierarchy_cache_manager() >>> if manager and manager.is_initialized(): ... location_cache = manager.get_location_cache() """ return HierarchyCacheManager.get_instance() if HierarchyCacheManager._instance else None

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ShivamPansuriya/MCP-server-Python'

If you have feedback or need assistance with the MCP directory API, please join our Discord server