Skip to main content
Glama
raw_extraction.py6.84 kB
""" Raw AST Extraction Pure tree-sitter → flat list: - Uses tree-sitter to parse source files - Extracts lightweight RawNode data holders - No relationships or business logic - Memory efficient (can discard AST nodes after extraction) """ from dataclasses import dataclass from typing import List, Optional, Dict, Any from pathlib import Path # Tree-sitter imports (same as current implementation) try: from tree_sitter_language_pack import get_parser, get_language from tree_sitter import Language, Parser, Node as TSNode except ImportError: raise ImportError("tree-sitter language modules not available. Install with: pip install tree-sitter tree-sitter-python tree-sitter-cpp tree-sitter-java") @dataclass class RawNode: """ Data holder from tree-sitter. """ node_type: str # tree-sitter node type start_line: int end_line: int start_byte: int end_byte: int content: str file_path: str ts_node: TSNode children_indices: List[int] = None # Just indices, not objects parent_index: Optional[int] = None def __post_init__(self): if self.children_indices is None: self.children_indices = [] class LanguageParser: """ Pure AST → flat list conversion. Uses language_registry for language detection and parser initialization. """ def __init__(self): self.parsers: Dict[str, Parser] = {} self._initialize_parsers() def _initialize_parsers(self) -> None: """Initialize tree-sitter parsers for all registered languages.""" from nabu.language_handlers import language_registry try: for language in language_registry.get_supported_languages(): self.parsers[language] = get_parser(language) except Exception as e: raise RuntimeError(f"Failed to initialize tree-sitter parsers: {e}") def detect_language(self, file_path: str) -> Optional[str]: """Detect language from file extension using language registry.""" from nabu.language_handlers import language_registry return language_registry.detect_language(file_path) def extract_raw_nodes(self, file_path: str) -> List[RawNode]: """ Extract raw nodes from file. Returns flat list of RawNode objects with no relationships. This is pure data extraction with no business logic. """ language = self.detect_language(file_path) if not language or language not in self.parsers: raise ValueError(f"Unsupported language for file: {file_path}") # Read source file try: with open(file_path, 'r', encoding='utf-8') as f: source_code = f.read() except Exception as e: raise IOError(f"Failed to read file {file_path}: {e}") # Parse with tree-sitter parser = self.parsers[language] tree = parser.parse(source_code.encode('utf-8')) # Extract raw nodes raw_nodes: List[RawNode] = [] source_lines = source_code.split('\n') def visit_node(ts_node: TSNode, parent_idx: Optional[int] = None) -> int: """Recursively visit tree-sitter nodes.""" current_idx = len(raw_nodes) # Extract content (convert tree-sitter 0-indexed to 1-indexed for storage) start_line = ts_node.start_point[0] + 1 end_line = ts_node.end_point[0] + 1 content = self._extract_node_content(ts_node, source_lines) # Create raw node raw_node = RawNode( node_type=ts_node.type, start_line=start_line, end_line=end_line, start_byte=ts_node.start_byte, end_byte=ts_node.end_byte, content=content, file_path=file_path, parent_index=parent_idx, ts_node=ts_node ) raw_nodes.append(raw_node) # Process children for child in ts_node.children: child_idx = visit_node(child, current_idx) raw_node.children_indices.append(child_idx) return current_idx # Start extraction from root visit_node(tree.root_node) return raw_nodes def _extract_node_content(self, ts_node: TSNode, source_lines: List[str]) -> str: """ Extract the source code content for a node. Handles multi-line nodes properly. """ start_line = ts_node.start_point[0] end_line = ts_node.end_point[0] start_col = ts_node.start_point[1] end_col = ts_node.end_point[1] if start_line == end_line: # Single line if start_line < len(source_lines): line = source_lines[start_line] return line[start_col:end_col] else: # Multi-line lines = [] for line_idx in range(start_line, min(end_line + 1, len(source_lines))): line = source_lines[line_idx] if line_idx == start_line: lines.append(line[start_col:]) elif line_idx == end_line: lines.append(line[:end_col]) else: lines.append(line) return '\n'.join(lines) return "" def get_supported_extensions(self) -> List[str]: """Get list of supported file extensions from language registry.""" from nabu.language_handlers import language_registry return language_registry.get_all_extensions() def discover_source_files(self, root_path: str, file_filter: Optional['FileFilter'] = None) -> List[str]: """ Discover source files in directory tree. Args: root_path: Root directory to search file_filter: Optional FileFilter to exclude files (e.g., .venv, node_modules) If None, no filtering is applied (backwards compatible) Returns: Sorted list of file paths matching extensions and filter criteria """ from nabu.language_handlers import language_registry source_files = [] root = Path(root_path) for extension in language_registry.get_all_extensions(): pattern = f"**/*{extension}" files = root.glob(pattern) for f in files: if not f.is_file(): continue file_path_str = str(f) # Apply filter if provided if file_filter and not file_filter.should_watch(file_path_str): continue source_files.append(file_path_str) return sorted(source_files)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/y3i12/nabu_nisaba'

If you have feedback or need assistance with the MCP directory API, please join our Discord server