Skip to main content
Glama

code-index-mcp

json_index_builder.py15.9 kB
""" JSON Index Builder - Clean implementation using Strategy pattern. This replaces the monolithic parser implementation with a clean, maintainable Strategy pattern architecture. """ import logging import os import time from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed from dataclasses import dataclass, asdict from pathlib import Path from typing import Dict, List, Optional, Any, Tuple from .strategies import StrategyFactory from .models import SymbolInfo, FileInfo logger = logging.getLogger(__name__) @dataclass class IndexMetadata: """Metadata for the JSON index.""" project_path: str indexed_files: int index_version: str timestamp: str languages: List[str] total_symbols: int = 0 specialized_parsers: int = 0 fallback_files: int = 0 class JSONIndexBuilder: """ Main index builder using Strategy pattern for language parsing. This class orchestrates the index building process by: 1. Discovering files in the project 2. Using StrategyFactory to get appropriate parsers 3. Extracting symbols and metadata 4. Assembling the final JSON index """ def __init__(self, project_path: str, additional_excludes: Optional[List[str]] = None): from ..utils import FileFilter # Input validation if not isinstance(project_path, str): raise ValueError(f"Project path must be a string, got {type(project_path)}") project_path = project_path.strip() if not project_path: raise ValueError("Project path cannot be empty") if not os.path.isdir(project_path): raise ValueError(f"Project path does not exist: {project_path}") self.project_path = project_path self.in_memory_index: Optional[Dict[str, Any]] = None self.strategy_factory = StrategyFactory() self.file_filter = FileFilter(additional_excludes) logger.info(f"Initialized JSON index builder for {project_path}") strategy_info = self.strategy_factory.get_strategy_info() logger.info(f"Available parsing strategies: {len(strategy_info)} types") # Log specialized vs fallback coverage specialized = len(self.strategy_factory.get_specialized_extensions()) fallback = len(self.strategy_factory.get_fallback_extensions()) logger.info(f"Specialized parsers: {specialized} extensions, Fallback coverage: {fallback} extensions") def _process_file(self, file_path: str, specialized_extensions: set) -> Optional[Tuple[Dict, Dict, str, bool]]: """ Process a single file - designed for parallel execution. Args: file_path: Path to the file to process specialized_extensions: Set of extensions with specialized parsers Returns: Tuple of (symbols, file_info, language, is_specialized) or None on error """ try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() ext = Path(file_path).suffix.lower() rel_path = os.path.relpath(file_path, self.project_path).replace('\\', '/') # Get appropriate strategy strategy = self.strategy_factory.get_strategy(ext) # Track strategy usage is_specialized = ext in specialized_extensions # Parse file using strategy symbols, file_info = strategy.parse_file(rel_path, content) logger.debug(f"Parsed {rel_path}: {len(symbols)} symbols ({file_info.language})") return (symbols, {rel_path: file_info}, file_info.language, is_specialized) except Exception as e: logger.warning(f"Error processing {file_path}: {e}") return None def build_index(self, parallel: bool = True, max_workers: Optional[int] = None) -> Dict[str, Any]: """ Build the complete index using Strategy pattern with parallel processing. Args: parallel: Whether to use parallel processing (default: True) max_workers: Maximum number of worker processes/threads (default: CPU count) Returns: Complete JSON index with metadata, symbols, and file information """ logger.info(f"Building JSON index using Strategy pattern (parallel={parallel})...") start_time = time.time() all_symbols = {} all_files = {} languages = set() specialized_count = 0 fallback_count = 0 # Get specialized extensions for tracking specialized_extensions = set(self.strategy_factory.get_specialized_extensions()) # Get list of files to process files_to_process = self._get_supported_files() total_files = len(files_to_process) if total_files == 0: logger.warning("No files to process") return self._create_empty_index() logger.info(f"Processing {total_files} files...") if parallel and total_files > 1: # Use ThreadPoolExecutor for I/O-bound file reading # ProcessPoolExecutor has issues with strategy sharing if max_workers is None: max_workers = min(os.cpu_count() or 4, total_files) logger.info(f"Using parallel processing with {max_workers} workers") with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all tasks future_to_file = { executor.submit(self._process_file, file_path, specialized_extensions): file_path for file_path in files_to_process } # Process completed tasks processed = 0 for future in as_completed(future_to_file): file_path = future_to_file[future] result = future.result() if result: symbols, file_info_dict, language, is_specialized = result all_symbols.update(symbols) all_files.update(file_info_dict) languages.add(language) if is_specialized: specialized_count += 1 else: fallback_count += 1 processed += 1 if processed % 100 == 0: logger.debug(f"Processed {processed}/{total_files} files") else: # Sequential processing logger.info("Using sequential processing") for file_path in files_to_process: result = self._process_file(file_path, specialized_extensions) if result: symbols, file_info_dict, language, is_specialized = result all_symbols.update(symbols) all_files.update(file_info_dict) languages.add(language) if is_specialized: specialized_count += 1 else: fallback_count += 1 # Build index metadata metadata = IndexMetadata( project_path=self.project_path, indexed_files=len(all_files), index_version="2.0.0-strategy", timestamp=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), languages=sorted(list(languages)), total_symbols=len(all_symbols), specialized_parsers=specialized_count, fallback_files=fallback_count ) # Assemble final index index = { "metadata": asdict(metadata), "symbols": {k: asdict(v) for k, v in all_symbols.items()}, "files": {k: asdict(v) for k, v in all_files.items()} } # Cache in memory self.in_memory_index = index elapsed = time.time() - start_time logger.info(f"Built index with {len(all_symbols)} symbols from {len(all_files)} files in {elapsed:.2f}s") logger.info(f"Languages detected: {sorted(languages)}") logger.info(f"Strategy usage: {specialized_count} specialized, {fallback_count} fallback") return index def _create_empty_index(self) -> Dict[str, Any]: """Create an empty index structure.""" metadata = IndexMetadata( project_path=self.project_path, indexed_files=0, index_version="2.0.0-strategy", timestamp=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), languages=[], total_symbols=0, specialized_parsers=0, fallback_files=0 ) return { "metadata": asdict(metadata), "symbols": {}, "files": {} } def get_index(self) -> Optional[Dict[str, Any]]: """Get the current in-memory index.""" return self.in_memory_index def clear_index(self): """Clear the in-memory index.""" self.in_memory_index = None logger.debug("Cleared in-memory index") def _get_supported_files(self) -> List[str]: """ Get all supported files in the project using centralized filtering. Returns: List of file paths that can be parsed """ supported_files = [] base_path = Path(self.project_path) try: for root, dirs, files in os.walk(self.project_path): # Filter directories in-place using centralized logic dirs[:] = [d for d in dirs if not self.file_filter.should_exclude_directory(d)] # Filter files using centralized logic for file in files: file_path = Path(root) / file if self.file_filter.should_process_path(file_path, base_path): supported_files.append(str(file_path)) except Exception as e: logger.error(f"Error scanning directory {self.project_path}: {e}") logger.debug(f"Found {len(supported_files)} supported files") return supported_files def build_shallow_file_list(self) -> List[str]: """ Build a minimal shallow index consisting of relative file paths only. This method does not read file contents. It enumerates supported files using centralized filtering and returns normalized relative paths with forward slashes for cross-platform consistency. Returns: List of relative file paths (using '/'). """ try: absolute_files = self._get_supported_files() result: List[str] = [] for abs_path in absolute_files: rel_path = os.path.relpath(abs_path, self.project_path).replace('\\', '/') # Normalize leading './' if rel_path.startswith('./'): rel_path = rel_path[2:] result.append(rel_path) return result except Exception as e: logger.error(f"Failed to build shallow file list: {e}") return [] def save_index(self, index: Dict[str, Any], index_path: str) -> bool: """ Save index to disk. Args: index: Index data to save index_path: Path where to save the index Returns: True if successful, False otherwise """ try: import json with open(index_path, 'w', encoding='utf-8') as f: json.dump(index, f, indent=2, ensure_ascii=False) logger.info(f"Saved index to {index_path}") return True except Exception as e: logger.error(f"Failed to save index to {index_path}: {e}") return False def load_index(self, index_path: str) -> Optional[Dict[str, Any]]: """ Load index from disk. Args: index_path: Path to the index file Returns: Index data if successful, None otherwise """ try: if not os.path.exists(index_path): logger.debug(f"Index file not found: {index_path}") return None import json with open(index_path, 'r', encoding='utf-8') as f: index = json.load(f) # Cache in memory self.in_memory_index = index logger.info(f"Loaded index from {index_path}") return index except Exception as e: logger.error(f"Failed to load index from {index_path}: {e}") return None def get_parsing_statistics(self) -> Dict[str, Any]: """ Get detailed statistics about parsing capabilities. Returns: Dictionary with parsing statistics and strategy information """ strategy_info = self.strategy_factory.get_strategy_info() return { "total_strategies": len(strategy_info), "specialized_languages": [lang for lang in strategy_info.keys() if not lang.startswith('fallback_')], "fallback_languages": [lang.replace('fallback_', '') for lang in strategy_info.keys() if lang.startswith('fallback_')], "total_extensions": len(self.strategy_factory.get_all_supported_extensions()), "specialized_extensions": len(self.strategy_factory.get_specialized_extensions()), "fallback_extensions": len(self.strategy_factory.get_fallback_extensions()), "strategy_details": strategy_info } def get_file_symbols(self, file_path: str) -> List[Dict[str, Any]]: """ Get symbols for a specific file. Args: file_path: Relative path to the file Returns: List of symbols in the file """ if not self.in_memory_index: logger.warning("Index not loaded") return [] try: # Normalize file path file_path = file_path.replace('\\', '/') if file_path.startswith('./'): file_path = file_path[2:] # Get file info file_info = self.in_memory_index["files"].get(file_path) if not file_info: logger.warning(f"File not found in index: {file_path}") return [] # Work directly with global symbols for this file global_symbols = self.in_memory_index.get("symbols", {}) result = [] # Find all symbols for this file directly from global symbols for symbol_id, symbol_data in global_symbols.items(): symbol_file = symbol_data.get("file", "").replace("\\", "/") # Check if this symbol belongs to our file if symbol_file == file_path: symbol_type = symbol_data.get("type", "unknown") symbol_name = symbol_id.split("::")[-1] # Extract symbol name from ID # Create symbol info symbol_info = { "name": symbol_name, "called_by": symbol_data.get("called_by", []), "line": symbol_data.get("line"), "signature": symbol_data.get("signature") } # Categorize by type if symbol_type in ["function", "method"]: result.append(symbol_info) elif symbol_type == "class": result.append(symbol_info) # Sort by line number for consistent ordering result.sort(key=lambda x: x.get("line", 0)) return result except Exception as e: logger.error(f"Error getting file symbols for {file_path}: {e}") return []

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/johnhuang316/code-index-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server