Skip to main content
Glama
graph_builder.py22.6 kB
from typing import List, Dict, Optional from pathlib import Path import logging from nabu.parsing.raw_extraction import RawNode from nabu.core.frames import ( AstFrameBase, AstCodebaseFrame, AstLanguageFrame, AstPackageFrame, AstClassFrame, AstCallableFrame, AstEdge ) from nabu.core.frame_types import FrameNodeType, EdgeType from nabu.core.frame_stack import FrameStack from nabu.core.confidence import ConfidenceCalculator from nabu.core.registry import FrameRegistry from nabu.language_handlers import language_registry class GraphBuilder: """ Converts raw nodes to semantic frames. """ def _get_frame_mappings(self, language: str) -> Dict[str, FrameNodeType]: """ Get frame mappings for a specific language from the language handler. Args: language: Language name (python, cpp, java, perl) Returns: Dictionary mapping tree-sitter node types to FrameNodeType """ handler = language_registry.get_handler(language) if handler: return handler.get_frame_mappings() # Fallback to empty dict if no handler found logging.getLogger(__name__).warning(f"No handler found for language: {language}") return {} def __init__(self, context: 'CodebaseContext'): """ Initialize GraphBuilder with shared CodebaseContext. Args: context: Shared context containing registries, frame_stack, etc. """ from nabu.core.codebase_context import CodebaseContext self.context = context def build_codebase_graph(self, root_path: str, file_raw_nodes: Dict[str, List[RawNode]]) -> AstFrameBase: """ Build complete codebase graph. Creates the proper hierarchy: CODEBASE → LANGUAGE → (FILE/PACKAGE) → CLASS Addresses missing language root frames from current implementation. """ # Create root codebase frame codebase_name = Path(root_path).name codebase_frame = AstCodebaseFrame( id="temp", # Temporary - will be computed immediately type=FrameNodeType.CODEBASE, name=codebase_name, qualified_name=codebase_name, confidence=1.0, provenance="parsed" ) # Compute ID immediately so edges can reference it codebase_frame.compute_id() # Store root path for relative path calculation self.context.codebase_root = root_path # Initialize frame stack with codebase self.context.initialize_frame_stack(codebase_frame) # Group files by language language_files = self._group_files_by_language(file_raw_nodes) # Create language root frames for language, files in language_files.items(): # Reuse existing language frame or create new one if language not in self.context.language_frames: language_frame = self._create_language_frame(language, codebase_frame) self.context.language_frames[language] = language_frame # Initialize registry for this language self.context.registries[language] = FrameRegistry(language_frame) else: language_frame = self.context.language_frames[language] with self.context.frame_stack.language_context(language_frame): # Process all files for this language for file_path, raw_nodes in files.items(): # Check if file already processed if file_path not in self.context.processed_files: self._build_file_hierarchy(file_path, raw_nodes, language, language_frame) self.context.processed_files.add(file_path) # Collect all edges created during the process all_edges = self.get_all_edges() return codebase_frame def _group_files_by_language(self, file_raw_nodes: Dict[str, List[RawNode]]) -> Dict[str, Dict[str, List[RawNode]]]: """Group files by detected language.""" from .raw_extraction import LanguageParser language_files = {} parser = LanguageParser() for file_path, raw_nodes in file_raw_nodes.items(): language = parser.detect_language(file_path) if language: if language not in language_files: language_files[language] = {} language_files[language][file_path] = raw_nodes return language_files def _create_language_frame(self, language: str, codebase_frame: AstFrameBase) -> AstLanguageFrame: """ Create language root frame. Checks for existing language frame to avoid duplicates. """ language_name = f"{language}_root" # Check if language frame already exists existing_language_frame = codebase_frame.find_child_by_name(language_name) if existing_language_frame and existing_language_frame.type == FrameNodeType.LANGUAGE: return existing_language_frame # Create new language frame language_frame = AstLanguageFrame( id="temp", # Temporary - will be computed immediately type=FrameNodeType.LANGUAGE, name=language_name, qualified_name=f"{codebase_frame.qualified_name}.{language_name}", language=language, confidence=1.0, provenance="parsed" ) # Compute ID immediately so edges can reference it language_frame.compute_id() codebase_frame.add_child(language_frame) # Create CODEBASE→LANGUAGE edge for database export edge_confidence = ConfidenceCalculator.calculate_edge_confidence( EdgeType.CONTAINS, codebase_frame.confidence, language_frame.confidence ) self.context.frame_stack._create_edge( codebase_frame, language_frame, EdgeType.CONTAINS, edge_confidence ) return language_frame def _build_file_hierarchy( self, file_path: str, raw_nodes: List[RawNode], language: str, language_frame: AstLanguageFrame ) -> None: """ Build hierarchy for a single file. Creates package frames based on file path, then processes nodes. Children become direct descendants of PACKAGE or LANGUAGE (no FILE frame). """ # Track this file as processed self.context.processed_files.add(file_path) # Create package hierarchy from file path package_frame = self._create_package_hierarchy(file_path, language_frame, language) # Push package context if one exists # If no package, we're already in language_frame context (from language_context()) # so no need to push again if package_frame: with self.context.frame_stack.push_context(package_frame): self._process_raw_nodes(raw_nodes, language) else: # No package - process nodes directly in current context (language_frame) # Don't push language_frame again to avoid duplicate LANGUAGE on stack self._process_raw_nodes(raw_nodes, language) def _create_package_hierarchy( self, file_path: str, language_frame: AstLanguageFrame, language: str ) -> Optional[AstPackageFrame]: """ Create package hierarchy from file path using language handler. Args: file_path: Path to source file language_frame: Language frame to attach packages to language: Programming language Returns: Deepest package frame in hierarchy or None """ handler = language_registry.get_handler(language) if not handler: logging.getLogger(__name__).warning(f"No handler found for language: {language}") return None # Get package parts from handler package_parts = handler.extract_package_hierarchy_from_path( file_path, self.context.codebase_root ) if not package_parts: return None # Language handlers return parts in correct order (base to leaf) # e.g., ['com', 'example', 'utils'] for com.example.utils # No reversal needed! # Create package hierarchy with registry-based deduplication current_frame = language_frame for part in package_parts: # Build qualified name for this package qualified_name = f"{current_frame.qualified_name}{handler.get_separator()}{part}" # Use package registry as single source of truth existing = self.context.package_registry.get(qualified_name) if existing and existing.type == FrameNodeType.PACKAGE: # Reuse existing package - only add parent relationship if it doesn't exist # This prevents duplicate CONTAINS edges from language_root to all packages if existing not in current_frame.children: current_frame.add_child(existing) # Create CONTAINS edge for parent→package relationship edge_confidence = ConfidenceCalculator.calculate_edge_confidence( EdgeType.CONTAINS, current_frame.confidence, existing.confidence ) self.context.frame_stack._create_edge( current_frame, existing, EdgeType.CONTAINS, edge_confidence ) current_frame = existing else: package_frame = AstPackageFrame( id="temp", # Temporary - will be computed type=FrameNodeType.PACKAGE, name=part, qualified_name=qualified_name, language=language_frame.language, confidence=1.0, provenance="parsed", file_path=file_path, # Associate with source file start_line=0, # Package spans entire file end_line=0, start_byte=0, end_byte=0 ) # Compute stable ID package_frame.compute_id() # Register first, then add to parent self.context.package_registry[qualified_name] = package_frame current_frame.add_child(package_frame) # Create CONTAINS edge for parent→package relationship edge_confidence = ConfidenceCalculator.calculate_edge_confidence( EdgeType.CONTAINS, current_frame.confidence, package_frame.confidence ) self.context.frame_stack._create_edge( current_frame, package_frame, EdgeType.CONTAINS, edge_confidence ) current_frame = package_frame return current_frame def get_all_edges(self) -> List[AstEdge]: """ Collect all edges created during graph building. This addresses the issue where 0 edges were being returned. FrameStack creates CONTAINS edges but they weren't being collected. """ return self.context.frame_stack.edges.copy() def _process_raw_nodes(self, nodes_to_process: List[RawNode], language: str, all_raw_nodes: List[RawNode] = None, processed_indices: set = None, _depth: int = 0) -> None: """ Process raw nodes and create semantic frames. This is where the business logic happens - determining which raw nodes should become frames based on FRAME_MAPPINGS. Args: nodes_to_process: Nodes to iterate through and process language: Programming language all_raw_nodes: Complete array that children_indices refer to (defaults to nodes_to_process) processed_indices: Set of indices already processed as children (for hierarchical nesting) """ if all_raw_nodes is None: all_raw_nodes = nodes_to_process if processed_indices is None: processed_indices = set() mappings = self._get_frame_mappings(language) # Performance optimization: Create index map to avoid O(n²) with repeated index() calls node_to_index = {id(node): idx for idx, node in enumerate(all_raw_nodes)} for raw_node in nodes_to_process: # Find the actual index of this node in all_raw_nodes using precomputed map # processed_indices tracks indices in all_raw_nodes, not positions in nodes_to_process actual_index = node_to_index.get(id(raw_node)) if actual_index is None: # Node not in all_raw_nodes (shouldn't happen, but be defensive) logging.getLogger(__name__).warning(f"Node {raw_node.node_type} at line {raw_node.start_line} not found in all_raw_nodes") continue # Skip if already processed as child of another frame if actual_index in processed_indices: continue # Decision 1: Should this raw node become a frame? frame_type = mappings.get(raw_node.node_type) if not frame_type: continue # Skip non-semantic nodes # Create semantic frame frame = self._create_semantic_frame(raw_node, frame_type, language) # ALL frames become children of current context self.context.frame_stack.add_child_to_current(frame) # Decision 2: Should this frame create a new context scope? if self._should_push_to_stack(frame_type): # Push context for processing children with self.context.frame_stack.push_context(frame): # CRITICAL: Always pass all_raw_nodes so children_indices remain valid self._process_child_nodes(raw_node, all_raw_nodes, language, processed_indices, _depth) def _create_semantic_frame(self, raw_node: RawNode, frame_type: FrameNodeType, language: str) -> AstFrameBase: """ Create semantic frame from raw node. Delegates to FrameFactory for instantiation, deduplication, and class selection. """ from nabu.parsing.frame_factory import FrameFactory # Extract name name = self._extract_name_from_content(raw_node.content, frame_type, raw_node, language) # Calculate qualified name using frame stack context context_path = self.context.frame_stack.get_context_path() qualified_name = '.'.join(context_path + [name]) if name else None # Use factory to create frame with automatic deduplication # Frame ID is now computed internally based on content hash frame = FrameFactory.create_frame( frame_type=frame_type, name=name, qualified_name=qualified_name, raw_node=raw_node, language=language, context=self.context ) return frame def _should_push_to_stack(self, frame_type: FrameNodeType) -> bool: """ Determine if frame type should create a new context scope. Context-creating frames push a new level onto the frame stack, allowing their children to be nested within them. This affects: - Qualified name generation (children inherit parent's path) - Symbol resolution (children can access parent's scope) - Graph structure (children are descendants in the hierarchy) Structural frames (CLASS, CALLABLE, PACKAGE) create new contexts. Control flow frames (IF_BLOCK, FOR_LOOP, etc.) also create contexts for proper scope nesting. Non-context frames (if added in future): VARIABLE, PARAMETER, IMPORT, etc. These would become children but not create nested scopes. """ return frame_type.creates_context() def _process_child_nodes(self, parent_raw_node: RawNode, all_raw_nodes: List[RawNode], language: str, processed_indices: Optional[set] = None, _depth: int = 0) -> None: """ Process child nodes within parent context. Recursively process descendants BUT only those within parent's byte range. This correctly handles: 1. Intermediate non-semantic nodes (like 'block') that aren't in FRAME_MAPPINGS 2. Prevents sibling methods from being nested under __init__ Example: class_definition (CppMangler) → block → function_definition (__init__) → function_definition (mangle) → function_definition (_generate_itanium_name) All methods are children of the block, which is a child of the class. We need to process all methods as children of CppMangler, not as children of __init__. Args: parent_raw_node: The parent raw node whose descendants we're processing all_raw_nodes: Complete list of all raw nodes in the file language: Programming language processed_indices: Set of indices that have been processed at parent level _depth: Internal recursion depth counter (for safety) """ # Safety check: prevent infinite recursion if _depth > 1000: logging.getLogger(__name__).error(f"Maximum recursion depth exceeded in _process_child_nodes for {parent_raw_node.node_type} at line {parent_raw_node.start_line}") return if not parent_raw_node.children_indices: return if processed_indices is None: processed_indices = set() # Collect descendants that should be processed at this level # Strategy: Collect direct frame-candidate children, plus drill through non-semantic nodes # BUT: For each semantic node collected, DON'T collect ITS descendants (they'll be handled recursively) descendant_nodes = [] collected_indices = set() # Track what we've added to avoid duplicates def collect_children_smart(idx: int): """ Recursively collect children, but stop descending at semantic nodes. This handles: - Passthrough nodes (block, etc): drill through to find semantic children - Semantic nodes: add them, but DON'T recurse into their children """ if idx >= len(all_raw_nodes): return if idx in collected_indices: return if idx in processed_indices: return node = all_raw_nodes[idx] # Only include nodes within parent's byte range if not (node.start_byte >= parent_raw_node.start_byte and node.end_byte <= parent_raw_node.end_byte): return # Check if this node maps to a frame type mappings = self._get_frame_mappings(language) is_semantic = node.node_type in mappings # Add this node descendant_nodes.append(node) collected_indices.add(idx) # If semantic: STOP - its children will be processed when it's pushed # If non-semantic: drill through to find semantic children if not is_semantic: for child_idx in node.children_indices: collect_children_smart(child_idx) # Start collection from parent's direct children for child_idx in parent_raw_node.children_indices: collect_children_smart(child_idx) # Process all collected descendants # Non-semantic nodes (like 'block') will be skipped by FRAME_MAPPINGS check # But their descendants have been collected and will be processed # CRITICAL: Pass all_raw_nodes so children_indices remain valid when processing descendants # CRITICAL: Pass processed_indices (not a new set) so updates propagate to parent self._process_raw_nodes(descendant_nodes, language, all_raw_nodes, processed_indices, _depth + 1) # CRITICAL FIX: Mark all collected indices as processed in parent's tracking # This prevents the parent from reprocessing these nodes after we return # collected_indices contains indices relative to all_raw_nodes, which is what processed_indices tracks processed_indices.update(collected_indices) def _extract_name_from_content(self, content: str, frame_type: FrameNodeType, raw_node: RawNode, language: str) -> Optional[str]: """ Extract name from raw content using language handler. Delegates to language-specific handlers for robust name extraction. Args: content: Raw source code content frame_type: Type of frame (CLASS, CALLABLE, PACKAGE, or control flow) raw_node: Raw node with metadata language: Programming language Returns: Extracted name or None if extraction fails """ handler = language_registry.get_handler(language) if not handler: logging.getLogger(__name__).warning(f"No handler found for language: {language}") return None if not content or not content.strip(): return None # Structural types: delegate to language-specific handler if frame_type.has_semantic_name(): if frame_type == FrameNodeType.CLASS: return handler.extract_class_name(content, raw_node) elif frame_type == FrameNodeType.CALLABLE: return handler.extract_callable_name(content, raw_node) elif frame_type == FrameNodeType.PACKAGE: return handler.extract_package_name(content, raw_node) # Control flow frames: generate unique names using line+byte to avoid collisions # Multiple control flow statements at same line in different files need unique IDs if frame_type.is_control_flow() or frame_type == FrameNodeType.SCOPE: type_name = frame_type.value.lower() # Include byte position to distinguish control flow at same line in different contexts return f"{type_name}_line_{raw_node.start_line}_byte_{raw_node.start_byte}" return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/y3i12/nabu_nisaba'

If you have feedback or need assistance with the MCP directory API, please join our Discord server