Skip to main content
Glama
frames.py19.2 kB
from dataclasses import dataclass, field from typing import Optional, List, Dict, Any, TYPE_CHECKING from weakref import ref as WeakReference from nabu.core.frame_types import FrameNodeType, EdgeType, ConfidenceTier if TYPE_CHECKING: from tree_sitter import Node as TSNode from .registry import FrameRegistry from .field_info import FieldInfo, ParameterInfo @dataclass(slots=True) # Memory optimization: 40-60% reduction class AstFrameBase: # Core identification id: str type: FrameNodeType name: Optional[str] qualified_name: Optional[str] # Confidence system confidence: float = 1.0 # 0.0-1.0 precision confidence_tier: ConfidenceTier = ConfidenceTier.HIGH provenance: str = "parsed" # parsed/imported/inferred/external/parse_failed resolution_pass: int = 1 # Which parsing pass created this # Tree structure - MULTI-PARENT SUPPORT # Replaced single parent pointer with three data structures for different access patterns parents_by_id: Dict[str, 'AstFrameBase'] = field(default_factory=dict) parents_by_qualified_name: Dict[str, 'AstFrameBase'] = field(default_factory=dict) parents_list: List['AstFrameBase'] = field(default_factory=list) # Preserves insertion order children: List['AstFrameBase'] = field(default_factory=list) # Source location file_path: Optional[str] = None start_line: int = 0 end_line: int = 0 start_byte: int = 0 end_byte: int = 0 content: Optional[str] = None # Language context language: Optional[str] = None # Metadata for extensibility metadata: Dict[str, Any] = field(default_factory=dict) # Registry access (lazy-loaded, addresses memory concerns) _registry: Optional['FrameRegistry'] = None # Tree-sitter integration (direct reference - tree-sitter nodes cannot be weakly referenced) _tree_sitter_node: Optional[Any] = None # Cached heading (computed once during instantiation, used in DB/display) _cached_heading: Optional[str] = None def add_child(self, child: 'AstFrameBase') -> None: """ Add child frame and establish bidirectional multi-parent relationship. Idempotent: calling multiple times with same child has no effect. Thread-safe note: Not thread-safe, assumes single-threaded graph building. """ # Add child to parent's children list (deduplicated) if child not in self.children: self.children.append(child) # Add parent to child's parent structures (deduplicated) if self.id not in child.parents_by_id: child.parents_by_id[self.id] = self # Add to qualified name index if parent has qualified name if self.qualified_name: child.parents_by_qualified_name[self.qualified_name] = self # Add to ordered list (preserves insertion order = "primary" parent is first) child.parents_list.append(self) def get_primary_parent(self) -> Optional['AstFrameBase']: """Get primary parent (first parent added). For backward compatibility.""" return self.parents_list[0] if self.parents_list else None def get_parent_by_type(self, frame_type: FrameNodeType) -> Optional['AstFrameBase']: """Get first parent of specified type (insertion order).""" for parent in self.parents_list: if parent.type == frame_type: return parent return None def get_all_parents_by_type(self, frame_type: FrameNodeType) -> List['AstFrameBase']: """Get all parents of specified type (insertion order).""" return [p for p in self.parents_list if p.type == frame_type] def has_parent_type(self, frame_type: FrameNodeType) -> bool: """Check if frame has at least one parent of specified type.""" return any(p.type == frame_type for p in self.parents_list) @property def parent(self) -> Optional['AstFrameBase']: """Backward compatibility: returns primary parent.""" return self.get_primary_parent() def find_child_by_name(self, name: str) -> Optional['AstFrameBase']: """Find direct child by name.""" return next((c for c in self.children if c.name == name), None) def find_children_by_type(self, frame_type: FrameNodeType) -> List['AstFrameBase']: """Find all direct children of specific type.""" return [c for c in self.children if c.type == frame_type] def get_language_root(self) -> Optional['AstFrameBase']: """ Find language root by traversing parent graph with BFS. Tries primary parent chain first (fast path), then BFS through all parents. """ # Fast path: follow primary parent chain current = self.get_primary_parent() while current: if current.type == FrameNodeType.LANGUAGE: return current current = current.get_primary_parent() # Fallback: BFS through all parent paths if not self.parents_list: return None visited = set() queue = list(self.parents_list) while queue: current = queue.pop(0) if current.id in visited: continue visited.add(current.id) if current.type == FrameNodeType.LANGUAGE: return current queue.extend(current.parents_list) return None def is_descendant_of(self, ancestor: 'AstFrameBase') -> bool: """ Check if this frame is descendant of ancestor via ANY parent path (BFS). """ if not self.parents_list: return False visited = set() queue = list(self.parents_list) while queue: current = queue.pop(0) if current.id in visited: continue visited.add(current.id) if current == ancestor: return True queue.extend(current.parents_list) return False def walk_descendants(self) -> List['AstFrameBase']: """Get all descendants in depth-first order.""" result = [] for child in self.children: result.append(child) result.extend(child.walk_descendants()) return result @property def depth(self) -> int: """ Calculate minimum depth across all parent paths. Rationale: Minimum depth represents "most accessible" path to root. """ if not self.parents_list: return 0 # Calculate depth for each parent path, return minimum min_depth = float('inf') for parent in self.parents_list: parent_depth = parent.depth min_depth = min(min_depth, parent_depth + 1) return int(min_depth) if min_depth != float('inf') else 0 def has_cycle_to(self, potential_child: 'AstFrameBase') -> bool: """ Check if adding potential_child would create a cycle. Returns True if potential_child is an ancestor of self. """ return potential_child.is_descendant_of(self) def add_child_safe(self, child: 'AstFrameBase') -> bool: """ Add child with cycle detection. Returns True if added, False if would create cycle. """ if self.has_cycle_to(child): return False self.add_child(child) return True def get_parent_chain_for_mangling(self) -> List['AstFrameBase']: """ Build parent chain for mangling by traversing ONLY [CALLABLE, CLASS, PACKAGE] parents. Skips FILE, LANGUAGE, CODEBASE as per mangling requirements. Traverses multi-parent graph depth-first, prioritizing CALLABLE → CLASS → PACKAGE. """ chain = [] visited = set() def traverse(frame: 'AstFrameBase'): if frame.id in visited: return visited.add(frame.id) # Get parents in priority order: CALLABLE → CLASS → PACKAGE for parent_type in [FrameNodeType.CALLABLE, FrameNodeType.CLASS, FrameNodeType.PACKAGE]: parent_frame = frame.get_parent_by_type(parent_type) if parent_frame: traverse(parent_frame) # Recurse to build chain from root chain.append(parent_frame) break # Only follow one parent path (primary semantic parent) traverse(self) return chain @property def registry(self) -> Optional['FrameRegistry']: """Get the registry for this frame's language tree.""" if self._registry is None: language_root = self.get_language_root() if language_root and hasattr(language_root, '_registry'): self._registry = language_root._registry return self._registry def compute_id(self) -> str: """ Generate unique ID using enhanced content hash. Strategy: - Control flows: Position-based (stable across condition edits) - Semantic frames: Content-based (detect real changes) Returns: Unique identifier (16 hex chars) """ import hashlib # Use qualified_name as scope, fallback to name scope = self.qualified_name or self.name or "anonymous" # Control flows: position-only ID (content changes don't affect ID) if self.type.is_control_flow(): key = ( f"{self.file_path or 'unknown'}::" f"{scope}::" f"{self.type.value}" # NO content component - position-based only ) else: # Semantic frames (CLASS, CALLABLE, PACKAGE): content-based content_normalized = self._normalize_content(self.content) if self.content else "" key = ( f"{self.file_path or 'unknown'}::" f"{scope}::" f"{self.type.value}::" f"{content_normalized}" ) # Hash to create compact ID hash_obj = hashlib.sha256(key.encode('utf-8')) frame_id = hash_obj.hexdigest()[:16] # Store in id field self.id = frame_id return frame_id def _normalize_content(self, content: str) -> str: """ Normalize content for stable hashing. Removes: - Leading/trailing whitespace per line - Empty lines - Python comments (# and docstrings) - ALL whitespace (aggressive normalization for stability) This makes IDs stable across formatting and documentation changes. """ if not content: return "" import re # Remove docstrings (triple-quoted strings at start of blocks) # This is a heuristic - matches lines that are only docstrings content = re.sub(r'^\s*("""[\s\S]*?"""|\'\'\'[\s\S]*?\'\'\')\s*$', '', content, flags=re.MULTILINE) lines = content.split('\n') normalized_lines = [] for line in lines: stripped = line.strip() # Skip empty lines if not stripped: continue # Skip comment-only lines if stripped.startswith('#'): continue # Remove inline comments (simple heuristic) # This doesn't handle # inside strings perfectly, but good enough if '#' in stripped: # Split on # and take first part (before comment) parts = stripped.split('#') if len(parts) > 1: # Check if # is likely in a string (very simple check) before_hash = parts[0] quote_count_double = before_hash.count('"') - before_hash.count('\\"') quote_count_single = before_hash.count("'") - before_hash.count("\\'") # If quotes are balanced, # is likely a comment if quote_count_double % 2 == 0 and quote_count_single % 2 == 0: stripped = before_hash.strip() # AGGRESSIVE: Remove ALL whitespace for maximum stability # This makes 'def foo(x,y):' and 'def foo(x, y):' identical normalized = ''.join(stripped.split()) if normalized: normalized_lines.append(normalized) return '\n'.join(normalized_lines) @property def heading(self) -> str: """ Extract frame heading (declaration/signature). The heading is the "what it is" (declaration), separate from the "what it does" (implementation in content). Returns: Heading string (first significant line or computed signature) """ # Check cache first (for DB-loaded frames) if hasattr(self, '_cached_heading') and self._cached_heading: return self._cached_heading # Control flows: already first line only if self.type.is_control_flow(): return self.content or "" # Callables: extract signature if self.type == FrameNodeType.CALLABLE: return self._extract_signature() # Classes: extract class declaration if self.type == FrameNodeType.CLASS: return self._extract_class_declaration() # Packages: create package declaration if self.type == FrameNodeType.PACKAGE: return self._extract_package_declaration() # Fallback: first non-empty line if self.content: lines = self.content.split('\n') for line in lines: stripped = line.strip() if stripped: return stripped return "" def _extract_signature(self) -> str: """ Extract function/method signature (first significant line). Returns: Signature line without body """ if not self.content: return "" lines = self.content.split('\n') # Find first line with function/method declaration for line in lines: stripped = line.strip() # Skip empty lines, comments, decorators, imports if not stripped: continue if stripped.startswith(('#', '//', '/*', '@', 'import', 'from')): continue # This is the signature line return stripped # Fallback: first line return lines[0] if lines else "" def _extract_class_declaration(self) -> str: """ Extract class declaration header. Returns: Class declaration line """ if not self.content: return "" lines = self.content.split('\n') # Find first line with class declaration for line in lines: stripped = line.strip() # Skip empty lines, comments, decorators if not stripped: continue if stripped.startswith(('#', '//', '/*', '@', 'import', 'from')): continue # This is the class declaration return stripped # Fallback: first line return lines[0] if lines else "" def _extract_package_declaration(self) -> str: """ Extract or synthesize package declaration. Returns: Package declaration string """ # For Python: use qualified name if self.language == 'python': return f"# Package: {self.qualified_name}" # For Java/C++: look for package/namespace in content if self.content: lines = self.content.split('\n') for line in lines: stripped = line.strip() if stripped.startswith(('package ', 'namespace ')): return stripped # Fallback: use qualified name return f"# Package: {self.qualified_name}" @dataclass class AstEdge: """ Relationship between frames with confidence system. """ id: int subject_frame: AstFrameBase object_frame: AstFrameBase type: EdgeType # Confidence system for edges confidence: float = 1.0 confidence_tier: ConfidenceTier = ConfidenceTier.HIGH # Metadata for additional context metadata: Dict[str, Any] = field(default_factory=dict) def __post_init__(self): """Calculate confidence tier from confidence value.""" if self.confidence >= 0.8: self.confidence_tier = ConfidenceTier.HIGH elif self.confidence >= 0.5: self.confidence_tier = ConfidenceTier.MEDIUM elif self.confidence >= 0.2: self.confidence_tier = ConfidenceTier.LOW else: self.confidence_tier = ConfidenceTier.SPECULATIVE # Specialized frame types for specific languages (inheriting from base) @dataclass(slots=True) class AstCodebaseFrame(AstFrameBase): """Root codebase frame. Always type=CODEBASE.""" def __post_init__(self): self.type = FrameNodeType.CODEBASE self.name = self.name or "codebase" self.qualified_name = self.qualified_name or "codebase" @dataclass(slots=True) class AstLanguageFrame(AstFrameBase): def __post_init__(self): self.type = FrameNodeType.LANGUAGE # Language-specific initialization can go here @dataclass(slots=True) class AstPackageFrame(AstFrameBase): """Package/namespace frame.""" def __post_init__(self): self.type = FrameNodeType.PACKAGE @dataclass(slots=True) class AstClassFrame(AstFrameBase): """Class frame with field tracking.""" instance_fields: List['FieldInfo'] = field(default_factory=list) static_fields: List['FieldInfo'] = field(default_factory=list) def __post_init__(self): self.type = FrameNodeType.CLASS @dataclass(slots=True) class AstCallableFrame(AstFrameBase): """Callable frame with parameter tracking.""" parameters: List['ParameterInfo'] = field(default_factory=list) return_type: Optional[str] = None def __post_init__(self): self.type = FrameNodeType.CALLABLE @dataclass(slots=True) class AstVariableFrame(AstFrameBase): """ Variable frame with type information. """ declared_type: Optional[str] = None inferred_type: Optional[str] = None type_confidence: float = 1.0 is_mutable: bool = True scope: str = "local" # local, instance, class, global

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/y3i12/nabu_nisaba'

If you have feedback or need assistance with the MCP directory API, please join our Discord server