"""
Hierarchical Entity Cache System
Provides in-memory tree caches for hierarchical entities (locations, departments)
with fast lookups and path generation capabilities.
Based on the OptimizedLocationCache pattern from treiTree.py but generalized
to support multiple entity types.
"""
import sys
import bisect
import logging
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Union
from collections import defaultdict
logger = logging.getLogger(__name__)
@dataclass
class HierarchicalNode:
"""
Lightweight hierarchical node with minimal memory footprint.
Represents a single entity in a hierarchy (location, department, etc.)
"""
name: str
id: Union[str, int]
parent_id: Optional[Union[str, int]] = None
def __post_init__(self):
# Intern strings to reduce memory usage for duplicate names
if isinstance(self.name, str):
self.name = sys.intern(self.name)
if isinstance(self.id, str):
self.id = sys.intern(self.id)
if isinstance(self.parent_id, str):
self.parent_id = sys.intern(self.parent_id)
def __repr__(self):
return f"HierarchicalNode(name='{self.name}', id='{self.id}')"
class HierarchicalCache:
"""
High-performance hierarchical entity cache.
Features:
- O(1) ID lookups via HashMap
- O(log N) name searches via sorted arrays + binary search
- O(1) path lookups via pre-computed cache
- Handles deep hierarchies efficiently
- Thread-safe for read operations after construction
"""
def __init__(self, entity_type: str = "entity"):
"""
Initialize hierarchical cache.
Args:
entity_type: Type of entity being cached (for logging/debugging)
"""
self.entity_type = entity_type
# Primary storage: O(1) ID lookups
self.nodes_by_id: Dict[Union[str, int], HierarchicalNode] = {}
# Sorted indices for binary search: O(log N) lookups
self.names_sorted: List[Tuple[str, Union[str, int]]] = [] # (name, id)
self.names_lower_sorted: List[Tuple[str, Union[str, int]]] = [] # (name.lower(), id)
# Name-to-IDs mapping for handling duplicates
self.name_to_ids: Dict[str, List[Union[str, int]]] = defaultdict(list)
# Pre-computed paths cache: O(1) path lookups
self.paths_cache: Dict[Union[str, int], str] = {}
self.paths_name_cache: Dict[Union[str, int], str] = {}
# Children mapping for hierarchy traversal
self.children_by_parent: Dict[Union[str, int], List[Union[str, int]]] = defaultdict(list)
# Build state
self._indices_built = False
self._paths_computed = False
def add_node(self, name: str, id_: Union[str, int], parent_id: Optional[Union[str, int]] = None):
"""
Add a node to the cache.
Args:
name: Entity name
id_: Unique entity identifier
parent_id: Parent entity ID (None for root nodes)
"""
# Create and store node
node = HierarchicalNode(name, id_, parent_id)
self.nodes_by_id[id_] = node
# Track children for hierarchy
if parent_id is not None:
self.children_by_parent[parent_id].append(id_)
# Track name duplicates
self.name_to_ids[name.lower()].append(id_)
# Mark indices as needing rebuild
self._indices_built = False
self._paths_computed = False
def update_node(self, id_: Union[str, int], name: str, parent_id: Optional[Union[str, int]] = None) -> bool:
"""
Update an existing node in the cache, or add it if it doesn't exist (upsert).
This method replaces an existing node's name and parent_id while maintaining
cache integrity. If the node doesn't exist, it will be added to the cache.
All related indices and tracking structures are updated.
Args:
id_: Unique entity identifier of the node to update/add
name: New entity name
parent_id: New parent entity ID (None for root nodes)
Returns:
True if node was updated/added successfully
Example:
>>> cache.update_node(123, "New York Office", parent_id=100)
True
"""
# Check if node exists - if not, add it
if id_ not in self.nodes_by_id:
logger.debug(f"Node {id_} not found in cache, adding it")
self.add_node(name, id_, parent_id)
return True
old_node = self.nodes_by_id[id_]
old_name = old_node.name
old_parent_id = old_node.parent_id
# Remove old name from name tracking
if old_name.lower() in self.name_to_ids:
try:
self.name_to_ids[old_name.lower()].remove(id_)
# Clean up empty lists
if not self.name_to_ids[old_name.lower()]:
del self.name_to_ids[old_name.lower()]
except ValueError:
logger.warning(f"Node {id_} not found in name_to_ids for '{old_name}'")
# Remove from old parent's children list
if old_parent_id is not None and old_parent_id in self.children_by_parent:
try:
self.children_by_parent[old_parent_id].remove(id_)
# Clean up empty lists
if not self.children_by_parent[old_parent_id]:
del self.children_by_parent[old_parent_id]
except ValueError:
logger.warning(f"Node {id_} not found in children list of parent {old_parent_id}")
# Create updated node
updated_node = HierarchicalNode(name, id_, parent_id)
self.nodes_by_id[id_] = updated_node
# Add to new parent's children list
if parent_id is not None:
self.children_by_parent[parent_id].append(id_)
# Track new name
self.name_to_ids[name.lower()].append(id_)
# Mark indices as needing rebuild
self._indices_built = False
self._paths_computed = False
logger.debug(
f"Updated {self.entity_type} node {id_}: "
f"name '{old_name}' -> '{name}', "
f"parent {old_parent_id} -> {parent_id}"
)
return True
def remove_node(self, id_: Union[str, int]) -> bool:
"""
Remove a node from the cache by its ID.
This method removes a node and updates all related indices and tracking structures.
If the node has children, they will be orphaned (parent_id set to None).
Args:
id_: Unique entity identifier of the node to remove
Returns:
True if node was removed successfully, False if node doesn't exist
Example:
>>> cache.remove_node(123)
True
"""
# Check if node exists
if id_ not in self.nodes_by_id:
logger.warning(f"Cannot remove node {id_}: node not found in cache")
return False
node = self.nodes_by_id[id_]
node_name = node.name
node_parent_id = node.parent_id
# Handle children - orphan them by setting their parent_id to None
if id_ in self.children_by_parent:
child_ids = self.children_by_parent[id_].copy()
for child_id in child_ids:
if child_id in self.nodes_by_id:
child_node = self.nodes_by_id[child_id]
# Create new node with parent_id = None
orphaned_node = HierarchicalNode(child_node.name, child_node.id, None)
self.nodes_by_id[child_id] = orphaned_node
# Remove the children mapping
del self.children_by_parent[id_]
logger.warning(f"Orphaned {len(child_ids)} children of node {id_}")
# Remove from parent's children list
if node_parent_id is not None and node_parent_id in self.children_by_parent:
try:
self.children_by_parent[node_parent_id].remove(id_)
# Clean up empty lists
if not self.children_by_parent[node_parent_id]:
del self.children_by_parent[node_parent_id]
except ValueError:
logger.warning(f"Node {id_} not found in children list of parent {node_parent_id}")
# Remove from name tracking
if node_name.lower() in self.name_to_ids:
try:
self.name_to_ids[node_name.lower()].remove(id_)
# Clean up empty lists
if not self.name_to_ids[node_name.lower()]:
del self.name_to_ids[node_name.lower()]
except ValueError:
logger.warning(f"Node {id_} not found in name_to_ids for '{node_name}'")
# Remove from paths caches
self.paths_cache.pop(id_, None)
self.paths_name_cache.pop(id_, None)
# Remove from primary storage
del self.nodes_by_id[id_]
# Mark indices as needing rebuild
self._indices_built = False
self._paths_computed = False
logger.debug(f"Removed {self.entity_type} node {id_} ('{node_name}')")
return True
def _build_indices(self):
"""Build sorted indices for fast searching. Call after all nodes added."""
if self._indices_built:
return
# Build sorted name indices for binary search
name_id_pairs = [(node.name, node.id) for node in self.nodes_by_id.values()]
self.names_sorted = sorted(name_id_pairs, key=lambda x: x[0])
# Build lowercase sorted index with lowercase names
name_lower_id_pairs = [(node.name.lower(), node.id) for node in self.nodes_by_id.values()]
self.names_lower_sorted = sorted(name_lower_id_pairs, key=lambda x: x[0])
self._indices_built = True
def _compute_paths(self):
"""Pre-compute all entity paths for O(1) access."""
if self._paths_computed:
return
self._build_indices() # Ensure indices are built first
# Compute paths for all nodes
for node_id, node in self.nodes_by_id.items():
# Build ID path
id_path = self._build_id_path(node_id)
self.paths_cache[node_id] = " -> ".join(str(id_) for id_ in id_path)
# Build name path
name_path = self._build_name_path(node_id)
self.paths_name_cache[node_id] = " -> ".join(name_path)
self._paths_computed = True
def _build_id_path(self, node_id: Union[str, int]) -> List[Union[str, int]]:
"""Build path of IDs from root to given node."""
path = []
current_id = node_id
visited = set() # Prevent infinite loops
while current_id is not None and current_id in self.nodes_by_id:
if current_id in visited:
logger.warning(f"Circular reference detected in {self.entity_type} hierarchy at node {current_id}")
break # Circular reference detected
visited.add(current_id)
path.append(current_id)
current_id = self.nodes_by_id[current_id].parent_id
return list(reversed(path))
def _build_name_path(self, node_id: Union[str, int]) -> List[str]:
"""Build path of names from root to given node."""
id_path = self._build_id_path(node_id)
return [self.nodes_by_id[id_].name for id_ in id_path]
def search_by_name_exact(self, name: str) -> List[HierarchicalNode]:
"""
Find nodes whose name exactly matches.
Args:
name: Exact name to search for
Returns:
List of matching HierarchicalNode objects
"""
self._build_indices()
# Handle case-insensitive search
matching_ids = self.name_to_ids.get(name.lower(), [])
return [self.nodes_by_id[id_] for id_ in matching_ids]
def search_by_name_prefix(self, prefix: str) -> List[HierarchicalNode]:
"""
Find nodes whose name starts with given prefix.
Args:
prefix: Name prefix to search for
Returns:
List of matching HierarchicalNode objects
"""
self._build_indices()
prefix_lower = prefix.lower()
results = []
# Binary search for first match
start_idx = bisect.bisect_left(self.names_lower_sorted, (prefix_lower, ""))
# Collect all matches
for i in range(start_idx, len(self.names_lower_sorted)):
name_lower, node_id = self.names_lower_sorted[i]
if not name_lower.startswith(prefix_lower):
break
results.append(self.nodes_by_id[node_id])
return results
def get_full_path(self, node_id: Union[str, int]) -> str:
"""
Return the full path (IDs) of a given node.
Args:
node_id: Node identifier
Returns:
Path string with IDs separated by " -> "
"""
self._compute_paths()
return self.paths_cache.get(node_id, "")
def get_full_path_name(self, identifier: Union[str, int]) -> Union[str, List[str]]:
"""
Return full path (names) given either node_id or name.
Args:
identifier: Either a node ID or an entity name
Returns:
Path string with names, or list of paths if multiple matches
"""
self._compute_paths()
# Try as node ID first
if identifier in self.nodes_by_id:
return self.paths_name_cache.get(identifier, "")
# Try as name
matching_nodes = self.search_by_name_exact(str(identifier))
if not matching_nodes:
return ""
if len(matching_nodes) == 1:
return self.paths_name_cache.get(matching_nodes[0].id, "")
# Multiple matches - return all paths
return [self.paths_name_cache.get(node.id, "") for node in matching_nodes]
def get_children(self, node_id: Union[str, int]) -> List[HierarchicalNode]:
"""
Get direct children of a node.
Args:
node_id: Parent node identifier
Returns:
List of child HierarchicalNode objects
"""
child_ids = self.children_by_parent.get(node_id, [])
return [self.nodes_by_id[child_id] for child_id in child_ids]
def get_stats(self) -> Dict[str, Union[int, float]]:
"""
Get cache statistics.
Returns:
Dictionary with cache statistics
"""
total_nodes = len(self.nodes_by_id)
root_nodes = sum(1 for node in self.nodes_by_id.values() if node.parent_id is None)
# Calculate depth statistics
depths = []
for node_id in self.nodes_by_id:
depth = len(self._build_id_path(node_id)) - 1
depths.append(depth)
return {
'entity_type': self.entity_type,
'total_nodes': total_nodes,
'root_nodes': root_nodes,
'max_depth': max(depths) if depths else 0,
'avg_depth': sum(depths) / len(depths) if depths else 0,
'indices_built': self._indices_built,
'paths_computed': self._paths_computed
}
# -------------------------------
# Elasticsearch Data Loaders
# -------------------------------
class BaseEntityLoader:
"""
Base class for loading hierarchical entities from Elasticsearch.
Handles common Elasticsearch operations and provides template for
entity-specific field mapping.
"""
def __init__(self, tenant_id: str, entity_type: str, index_suffix: str):
"""
Initialize entity loader.
Args:
tenant_id: Tenant identifier for index naming
entity_type: Type of entity (for logging)
index_suffix: Index suffix (e.g., "location", "department")
"""
self.tenant_id = tenant_id
self.entity_type = entity_type
self.index_name = f"{tenant_id}_{index_suffix}"
self.es_client = None
def connect(self) -> bool:
"""Connect to Elasticsearch using existing client infrastructure."""
try:
from elasticsearch_client import get_elasticsearch_client
es_wrapper = get_elasticsearch_client()
if es_wrapper.connect():
self.es_client = es_wrapper.get_client()
logger.info(f"Connected to Elasticsearch for {self.entity_type} loading")
return True
else:
logger.error(f"Failed to connect to Elasticsearch for {self.entity_type}")
return False
except ImportError as e:
logger.error(f"elasticsearch_client module not found: {e}")
return False
except Exception as e:
logger.error(f"Failed to connect to Elasticsearch: {e}", exc_info=True)
return False
def load_entities(self, cache: HierarchicalCache) -> bool:
"""
Load all entities from Elasticsearch into the cache.
Args:
cache: HierarchicalCache instance to populate
Returns:
True if successful, False otherwise
"""
if not self.es_client:
logger.error(f"Not connected to Elasticsearch for {self.entity_type}")
return False
try:
# Check if index exists
if not self.es_client.indices.exists(index=self.index_name):
logger.warning(f"Index '{self.index_name}' does not exist")
return False
# Get all entities using scroll API for large datasets
query = {
"query": {"match_all": {}},
"size": 1000 # Process in batches
}
response = self.es_client.search(
index=self.index_name,
body=query,
scroll='2m'
)
scroll_id = response['_scroll_id']
hits = response['hits']['hits']
total_loaded = 0
# Process first batch
total_loaded += self._process_batch(cache, hits)
# Process remaining batches
while len(hits) > 0:
response = self.es_client.scroll(scroll_id=scroll_id, scroll='2m')
hits = response['hits']['hits']
total_loaded += self._process_batch(cache, hits)
# Clear scroll
self.es_client.clear_scroll(scroll_id=scroll_id)
logger.info(f"✅ Loaded {total_loaded} {self.entity_type} entities from Elasticsearch")
return True
except Exception as e:
logger.error(f"Error loading {self.entity_type} entities: {e}", exc_info=True)
return False
def _process_batch(self, cache: HierarchicalCache, hits: list) -> int:
"""
Process a batch of Elasticsearch hits.
Must be implemented by subclasses to handle entity-specific field mapping.
Args:
cache: HierarchicalCache to populate
hits: List of Elasticsearch hit documents
Returns:
Number of entities successfully loaded
"""
raise NotImplementedError("Subclasses must implement _process_batch")
class LocationCacheLoader(BaseEntityLoader):
"""Loads location entities from Elasticsearch into HierarchicalCache."""
def __init__(self, tenant_id: str = "apolo"):
"""
Initialize location loader.
Args:
tenant_id: Tenant identifier for index naming
"""
super().__init__(tenant_id, "location", "location")
def _process_batch(self, cache: HierarchicalCache, hits: list) -> int:
"""Process a batch of location documents."""
loaded_count = 0
for hit in hits:
try:
source = hit['_source']
# Extract location data based on ES schema
location_id = source.get('dbid')
location_name = source.get('location_name', '')
parent_id = source.get('location_parentid')
# Validate required fields
if location_id is None or not location_name:
logger.warning(f"Skipping location with missing required fields: {hit.get('_id', 'unknown')}")
continue
# Convert parent_id = 0 to None (root nodes)
if parent_id == 0:
parent_id = None
# Add to cache
cache.add_node(location_name, location_id, parent_id)
loaded_count += 1
except Exception as e:
logger.warning(f"Error processing location {hit.get('_id', 'unknown')}: {e}")
continue
return loaded_count
class DepartmentCacheLoader(BaseEntityLoader):
"""Loads department entities from Elasticsearch into HierarchicalCache."""
def __init__(self, tenant_id: str = "apolo"):
"""
Initialize department loader.
Args:
tenant_id: Tenant identifier for index naming
"""
super().__init__(tenant_id, "department", "department")
def _process_batch(self, cache: HierarchicalCache, hits: list) -> int:
"""Process a batch of department documents."""
loaded_count = 0
for hit in hits:
try:
source = hit['_source']
# Extract department data based on ES schema
department_id = source.get('dbid')
department_name = source.get('department_name', '')
parent_id = source.get('department_parentid')
# Validate required fields
if department_id is None or not department_name:
logger.warning(f"Skipping department with missing required fields: {hit.get('_id', 'unknown')}")
continue
# Convert parent_id = 0 to None (root nodes)
if parent_id == 0:
parent_id = None
# Add to cache
cache.add_node(department_name, department_id, parent_id)
loaded_count += 1
except Exception as e:
logger.warning(f"Error processing department {hit.get('_id', 'unknown')}: {e}")
continue
return loaded_count
# -------------------------------
# Hierarchy Cache Manager
# -------------------------------
class HierarchyCacheManager:
"""
Manages multiple hierarchical entity caches (locations, departments).
Provides a unified interface for accessing and managing separate tree
structures for different entity types. Implements singleton pattern
to ensure single initialization.
"""
_instance: Optional['HierarchyCacheManager'] = None
_initialized: bool = False
def __init__(self):
"""Initialize cache manager (use get_instance() instead)."""
self.location_cache: Optional[HierarchicalCache] = None
self.department_cache: Optional[HierarchicalCache] = None
self._tenant_id: Optional[str] = None
@classmethod
def get_instance(cls) -> 'HierarchyCacheManager':
"""
Get singleton instance of cache manager.
Returns:
HierarchyCacheManager instance
"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def initialize(self, tenant_id: str) -> bool:
"""
Initialize both location and department caches.
Args:
tenant_id: Tenant identifier for index naming
Returns:
True if at least one cache was successfully initialized
"""
if self._initialized:
logger.info("Hierarchy caches already initialized")
return True
self._tenant_id = tenant_id
logger.info(f"Initializing hierarchy caches for tenant '{tenant_id}'...")
location_success = self._initialize_location_cache(tenant_id)
department_success = self._initialize_department_cache(tenant_id)
# Mark as initialized if at least one succeeded
if location_success or department_success:
self._initialized = True
logger.info("✅ Hierarchy cache initialization complete")
self._log_statistics()
return True
else:
logger.error("❌ Failed to initialize any hierarchy caches")
return False
def _initialize_location_cache(self, tenant_id: str) -> bool:
"""Initialize location cache."""
try:
logger.info("Loading location hierarchy...")
# Create cache
self.location_cache = HierarchicalCache(entity_type="location")
# Load data from Elasticsearch
loader = LocationCacheLoader(tenant_id)
if not loader.connect():
logger.error("Failed to connect to Elasticsearch for locations")
return False
if not loader.load_entities(self.location_cache):
logger.error("Failed to load location data")
return False
# Build indices and compute paths
self.location_cache._build_indices()
self.location_cache._compute_paths()
logger.info("✅ Location cache initialized successfully")
return True
except Exception as e:
logger.error(f"Error initializing location cache: {e}", exc_info=True)
self.location_cache = None
return False
def _initialize_department_cache(self, tenant_id: str) -> bool:
"""Initialize department cache."""
try:
logger.info("Loading department hierarchy...")
# Create cache
self.department_cache = HierarchicalCache(entity_type="department")
# Load data from Elasticsearch
loader = DepartmentCacheLoader(tenant_id)
if not loader.connect():
logger.error("Failed to connect to Elasticsearch for departments")
return False
if not loader.load_entities(self.department_cache):
logger.error("Failed to load department data")
return False
# Build indices and compute paths
self.department_cache._build_indices()
self.department_cache._compute_paths()
logger.info("✅ Department cache initialized successfully")
return True
except Exception as e:
logger.error(f"Error initializing department cache: {e}", exc_info=True)
self.department_cache = None
return False
def _log_statistics(self):
"""Log statistics for all initialized caches."""
if self.location_cache:
stats = self.location_cache.get_stats()
logger.info(
f"📊 Location Cache: {stats['total_nodes']} nodes, "
f"{stats['root_nodes']} roots, max depth {stats['max_depth']}"
)
if self.department_cache:
stats = self.department_cache.get_stats()
logger.info(
f"📊 Department Cache: {stats['total_nodes']} nodes, "
f"{stats['root_nodes']} roots, max depth {stats['max_depth']}"
)
def get_location_cache(self) -> Optional[HierarchicalCache]:
"""
Get location cache instance.
Returns:
HierarchicalCache for locations or None if not initialized
"""
return self.location_cache
def get_department_cache(self) -> Optional[HierarchicalCache]:
"""
Get department cache instance.
Returns:
HierarchicalCache for departments or None if not initialized
"""
return self.department_cache
def is_initialized(self) -> bool:
"""Check if caches have been initialized."""
return self._initialized
def get_statistics(self) -> Dict[str, any]:
"""
Get statistics for all caches.
Returns:
Dictionary with statistics for each cache
"""
stats = {
'tenant_id': self._tenant_id,
'initialized': self._initialized,
'location': self.location_cache.get_stats() if self.location_cache else None,
'department': self.department_cache.get_stats() if self.department_cache else None,
}
return stats
# -------------------------------
# Public API
# -------------------------------
def initialize_hierarchy_caches(tenant_id: str = "apolo") -> Optional[HierarchyCacheManager]:
"""
Initialize hierarchical entity caches for locations and departments.
This is the main entry point for setting up the cache system. It should
be called once during server startup.
Args:
tenant_id: Tenant identifier for index naming (default: "apolo")
Returns:
HierarchyCacheManager instance if successful, None if completely failed
Example:
>>> # During server startup
>>> cache_manager = initialize_hierarchy_caches("apolo")
>>> if cache_manager:
... location_cache = cache_manager.get_location_cache()
... dept_cache = cache_manager.get_department_cache()
"""
try:
logger.info("=" * 60)
logger.info("🚀 Initializing Hierarchy Cache System")
logger.info("=" * 60)
# Get singleton manager instance
manager = HierarchyCacheManager.get_instance()
# Initialize caches
success = manager.initialize(tenant_id)
if success:
logger.info("=" * 60)
logger.info("✅ Hierarchy Cache System Ready")
logger.info("=" * 60)
return manager
else:
logger.warning("⚠️ Hierarchy cache initialization failed, but server can continue")
return manager # Return manager even if initialization failed
except Exception as e:
logger.error(f"❌ Critical error during hierarchy cache initialization: {e}", exc_info=True)
logger.warning("Server will continue without hierarchy caches")
return None
def get_hierarchy_cache_manager() -> Optional[HierarchyCacheManager]:
"""
Get the singleton hierarchy cache manager instance.
Returns:
HierarchyCacheManager instance or None if not initialized
Example:
>>> manager = get_hierarchy_cache_manager()
>>> if manager and manager.is_initialized():
... location_cache = manager.get_location_cache()
"""
return HierarchyCacheManager.get_instance() if HierarchyCacheManager._instance else None