Skip to main content
Glama
multi_pass_parser.py12.9 kB
""" Multi-Pass Parser Orchestrates the three-phase processing pipeline This is the main entry point that coordinates: 1. Raw AST Extraction 2. Semantic Frame Creation 3. Symbol Resolution Implements graceful degradation and confidence-based parsing. """ import logging from typing import Dict, List, Optional from pathlib import Path from nabu.parsing.raw_extraction import LanguageParser, RawNode from nabu.parsing.graph_builder import GraphBuilder from nabu.parsing.symbol_resolver import SymbolResolver from nabu.core.frames import AstFrameBase, AstEdge from nabu.core.frame_types import FrameNodeType from nabu.core.confidence import ConfidenceContext logger = logging.getLogger(__name__) class MultiPassParser: """ Main parser orchestrating the three-phase pipeline. """ def __init__(self, extra_ignore_patterns: Optional[List[str]] = None): from nabu.core.codebase_context import CodebaseContext # Create shared context for all parsing components self.context = CodebaseContext() # Store extra ignore patterns for file filtering self.extra_ignore_patterns = extra_ignore_patterns or [] # Initialize parsing components with shared context self.language_parser = LanguageParser() self.graph_builder = GraphBuilder(self.context) self.symbol_resolver = SymbolResolver(self.context) # Track edges from GraphBuilder # Track edges from GraphBuilder def parse_codebase(self, codebase_path: str) -> tuple[AstFrameBase, List[AstEdge]]: """ Main entry point for parsing entire codebase. """ logger.info(f"Starting multi-pass parsing of codebase: {codebase_path}") try: # Phase 1: Structure Discovery (confidence = 1.0) logger.info("Phase 1: Raw AST extraction") self.context.confidence_context.set_pass(1) raw_nodes_by_file = self._extract_raw_structure(codebase_path) # Phase 2: Frame Hierarchy Creation (confidence = 1.0 for parsed, 0.6-0.9 for inferred) logger.info("Phase 2: Semantic frame creation") self.context.confidence_context.set_pass(2) codebase_frame = self._build_frame_hierarchy(codebase_path, raw_nodes_by_file) # Phase 3: Symbol Resolution (confidence = 0.3-0.9) logger.info("Phase 3: Symbol resolution") self.context.confidence_context.set_pass(3) edges = self._resolve_symbols(codebase_frame) # Phase 4: Additional processing could go here # self.context.confidence_context.set_pass(4) # self._fuzzy_resolve_unknowns(codebase_frame) logger.info(f"Parsing completed. Created {self._count_frames(codebase_frame)} frames and {len(edges)} edges") return codebase_frame, edges except Exception as e: logger.error(f"Parsing failed: {e}") # Graceful degradation - create minimal codebase frame return self._create_minimal_codebase(codebase_path), [] def _extract_raw_structure(self, codebase_path: str) -> Dict[str, List[RawNode]]: """ Extract raw AST structure from all source files. Returns mapping of file_path -> List[RawNode]. Uses FileFilter to exclude files based on gitignore-style patterns. """ from nabu.file_watcher.filters import FileFilter from nabu.language_handlers import language_registry # Create file filter with default ignores + repository .gitignore + extra patterns ignore_patterns = FileFilter.default_ignores() repo_gitignore_patterns = self._load_gitignore_patterns(codebase_path) if repo_gitignore_patterns: ignore_patterns.extend(repo_gitignore_patterns) logger.debug(f"Loaded {len(repo_gitignore_patterns)} patterns from .gitignore") # Add extra patterns from config if self.extra_ignore_patterns: ignore_patterns.extend(self.extra_ignore_patterns) logger.debug(f"Added {len(self.extra_ignore_patterns)} extra ignore patterns from config") # Get watched extensions from language registry watch_extensions = language_registry.get_all_extensions() file_filter = FileFilter( ignore_patterns=ignore_patterns, watch_extensions=watch_extensions, codebase_path=Path(codebase_path) ) # Discover source files with filtering source_files = self.language_parser.discover_source_files(codebase_path, file_filter) logger.info(f"Discovered {len(source_files)} source files (after filtering)") raw_nodes_by_file = {} for file_path in source_files: try: raw_nodes = self.language_parser.extract_raw_nodes(file_path) raw_nodes_by_file[file_path] = raw_nodes logger.debug(f"Extracted {len(raw_nodes)} raw nodes from {file_path}") except Exception as e: logger.warning(f"Failed to extract raw nodes from {file_path}: {e}") # Continue with other files - graceful degradation continue logger.info(f"Phase 1 completed: extracted raw nodes from {len(raw_nodes_by_file)} files") return raw_nodes_by_file def _load_gitignore_patterns(self, codebase_path: str) -> List[str]: """ Load gitignore patterns from repository's .gitignore file. Args: codebase_path: Root directory of codebase Returns: List of gitignore patterns, or empty list if .gitignore not found """ gitignore_path = Path(codebase_path) / ".gitignore" if not gitignore_path.exists(): return [] try: patterns = [] with open(gitignore_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() # Skip empty lines and comments if line and not line.startswith('#'): patterns.append(line) return patterns except Exception as e: logger.warning(f"Failed to load .gitignore from {gitignore_path}: {e}") return [] def _build_frame_hierarchy(self, codebase_path: str, raw_nodes_by_file: Dict[str, List[RawNode]]) -> AstFrameBase: """ Phase 2: Build semantic frame hierarchy. Creates proper CODEBASE → LANGUAGE → (FILE/PACKAGE) → CLASS structure. """ try: codebase_frame = self.graph_builder.build_codebase_graph(codebase_path, raw_nodes_by_file) # Collect edges from GraphBuilder's FrameStack self.context.hierarchy_edges = self.graph_builder.get_all_edges() frame_count = self._count_frames(codebase_frame) edge_count = len(self.context.hierarchy_edges) logger.info(f"Phase 2 completed: built frame hierarchy with {frame_count} frames and {edge_count} edges") return codebase_frame except Exception as e: logger.error(f"Frame hierarchy building failed: {e}") return self._create_minimal_codebase(codebase_path) def _resolve_symbols(self, codebase_frame: AstFrameBase) -> List[AstEdge]: """ Phase 3: Resolve cross-references and create edges. Implements confidence-based symbol resolution. """ try: symbol_edges = self.symbol_resolver.resolve_references(codebase_frame) # Combine hierarchy edges from GraphBuilder with symbol edges all_edges = self.context.hierarchy_edges + symbol_edges hierarchy_count = len(self.context.hierarchy_edges) symbol_count = len(symbol_edges) total_count = len(all_edges) logger.info(f"Phase 3 completed: {hierarchy_count} hierarchy edges + {symbol_count} symbol edges = {total_count} total edges") return all_edges except Exception as e: logger.error(f"Symbol resolution failed: {e}") # Return at least the hierarchy edges if symbol resolution fails return self.context.hierarchy_edges def _create_minimal_codebase(self, codebase_path: str) -> AstFrameBase: """ Create minimal codebase frame for graceful degradation. """ from nabu.core.frames import AstCodebaseFrame codebase_name = Path(codebase_path).name from nabu.core.frame_types import FrameNodeType return AstCodebaseFrame( id=0, type=FrameNodeType.CODEBASE, name=codebase_name, qualified_name=codebase_name, confidence=0.1, provenance="parse_failed", metadata={'error': 'Parsing failed, created minimal frame'} ) def _count_frames(self, codebase_frame: AstFrameBase) -> int: """ Count total UNIQUE frames in hierarchy. CRITICAL: Uses visited set to avoid counting frames multiple times in multi-parent graphs where a frame can appear as child of multiple parents. """ visited = set() def count_unique(frame: AstFrameBase) -> int: if frame.id in visited: return 0 # Already counted visited.add(frame.id) return 1 + sum(count_unique(child) for child in frame.children) return count_unique(codebase_frame) # Convenience methods for Phase-by-Phase parsing (for testing/debugging) def parse_single_file(self, file_path: str) -> tuple[AstFrameBase, List[AstEdge]]: """ Parse a single file for testing. Creates minimal codebase → language → file hierarchy. """ # Create minimal hierarchy for single file raw_nodes = self.language_parser.extract_raw_nodes(file_path) raw_nodes_by_file = {file_path: raw_nodes} # Use parent directory as codebase codebase_path = str(Path(file_path).parent) codebase_frame = self._build_frame_hierarchy(codebase_path, raw_nodes_by_file) edges = self._resolve_symbols(codebase_frame) # Set file_path for CODEBASE and LANGUAGE frames for incremental update tracking # This ensures they can be queried by file_path in the database # Note: We set file_path AFTER compute_id() was called, so IDs remain stable absolute_file_path = str(Path(file_path).resolve()) codebase_frame.file_path = absolute_file_path for language_frame in codebase_frame.find_children_by_type(FrameNodeType.LANGUAGE): language_frame.file_path = absolute_file_path return codebase_frame, edges def get_statistics(self, codebase_frame: AstFrameBase, edges: List[AstEdge]) -> Dict[str, any]: """ Get parsing statistics. """ stats = { 'total_frames': self._count_frames(codebase_frame), 'total_edges': len(edges), 'languages': [], 'frame_types': {}, 'edge_types': {}, 'confidence_distribution': {'high': 0, 'medium': 0, 'low': 0, 'speculative': 0} } # Collect frame statistics self._collect_frame_stats(codebase_frame, stats) # Collect edge statistics for edge in edges: edge_type = edge.type.value stats['types'][edge_type] = stats['edge_types'].get(edge_type, 0) + 1 # Confidence distribution for edges tier = edge.confidence_tier.value.lower() stats['confidence_distribution'][tier] += 1 return stats def _collect_frame_stats(self, frame: AstFrameBase, stats: Dict[str, any], visited: set = None) -> None: """Recursively collect frame statistics with cycle detection.""" if visited is None: visited = set() # Prevent infinite recursion in multi-parent graphs if frame.id in visited: return visited.add(frame.id) # Frame type distribution frame_type = frame.type.value stats['frame_types'][frame_type] = stats['frame_types'].get(frame_type, 0) + 1 # Language detection if frame.language and frame.language not in stats['languages']: stats['languages'].append(frame.language) # Confidence distribution for frames tier = frame.confidence_tier.value.lower() if tier in stats['confidence_distribution']: stats['confidence_distribution'][tier] += 1 # Recurse to children for child in frame.children: self._collect_frame_stats(child, stats, visited)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/y3i12/nabu_nisaba'

If you have feedback or need assistance with the MCP directory API, please join our Discord server