Skip to main content
Glama

code-index-mcp

""" Project Settings Management This module provides functionality for managing project settings and persistent data for the Code Index MCP server. """ import os import json import tempfile import hashlib from datetime import datetime from .constants import ( SETTINGS_DIR, CONFIG_FILE, INDEX_FILE ) from .search.base import SearchStrategy from .search.ugrep import UgrepStrategy from .search.ripgrep import RipgrepStrategy from .search.ag import AgStrategy from .search.grep import GrepStrategy from .search.basic import BasicSearchStrategy # Prioritized list of search strategies SEARCH_STRATEGY_CLASSES = [ UgrepStrategy, RipgrepStrategy, AgStrategy, GrepStrategy, BasicSearchStrategy, ] def _get_available_strategies() -> list[SearchStrategy]: """ Detect and return a list of available search strategy instances, ordered by preference. """ available = [] for strategy_class in SEARCH_STRATEGY_CLASSES: try: strategy = strategy_class() if strategy.is_available(): available.append(strategy) except Exception: pass return available class ProjectSettings: """Class for managing project settings and index data""" def __init__(self, base_path, skip_load=False): """Initialize project settings Args: base_path (str): Base path of the project skip_load (bool): Whether to skip loading files """ self.base_path = base_path self.skip_load = skip_load self.available_strategies: list[SearchStrategy] = [] self.refresh_available_strategies() # Ensure the base path of the temporary directory exists try: # Get system temporary directory system_temp = tempfile.gettempdir() # Check if the system temporary directory exists and is writable if not os.path.exists(system_temp): # Try using project directory as fallback if available if base_path and os.path.exists(base_path): system_temp = base_path else: # Use user's home directory as last resort system_temp = os.path.expanduser("~") if not os.access(system_temp, os.W_OK): # Try using project directory as fallback if available if base_path and os.path.exists(base_path) and os.access(base_path, os.W_OK): system_temp = base_path else: # Use user's home directory as last resort system_temp = os.path.expanduser("~") # Create code_indexer directory temp_base_dir = os.path.join(system_temp, SETTINGS_DIR) if not os.path.exists(temp_base_dir): os.makedirs(temp_base_dir, exist_ok=True) else: pass except Exception: # If unable to create temporary directory, use .code_indexer in project directory if available if base_path and os.path.exists(base_path): temp_base_dir = os.path.join(base_path, ".code_indexer") else: # Use home directory as last resort temp_base_dir = os.path.join(os.path.expanduser("~"), ".code_indexer") if not os.path.exists(temp_base_dir): os.makedirs(temp_base_dir, exist_ok=True) # Use system temporary directory to store index data try: if base_path: # Use hash of project path as unique identifier path_hash = hashlib.md5(base_path.encode()).hexdigest() self.settings_path = os.path.join(temp_base_dir, path_hash) else: # If no base path provided, use a default directory self.settings_path = os.path.join(temp_base_dir, "default") self.ensure_settings_dir() except Exception: # If error occurs, use .code_indexer in project or home directory as fallback if base_path and os.path.exists(base_path): fallback_dir = os.path.join(base_path, ".code_indexer", hashlib.md5(base_path.encode()).hexdigest()) else: fallback_dir = os.path.join(os.path.expanduser("~"), ".code_indexer", "default" if not base_path else hashlib.md5(base_path.encode()).hexdigest()) self.settings_path = fallback_dir if not os.path.exists(fallback_dir): os.makedirs(fallback_dir, exist_ok=True) def ensure_settings_dir(self): """Ensure settings directory exists""" try: if not os.path.exists(self.settings_path): # Create directory structure os.makedirs(self.settings_path, exist_ok=True) else: pass # Check if directory is writable if not os.access(self.settings_path, os.W_OK): # If directory is not writable, use .code_indexer in project or home directory as fallback if self.base_path and os.path.exists(self.base_path) and os.access(self.base_path, os.W_OK): fallback_dir = os.path.join(self.base_path, ".code_indexer", os.path.basename(self.settings_path)) else: fallback_dir = os.path.join(os.path.expanduser("~"), ".code_indexer", os.path.basename(self.settings_path)) self.settings_path = fallback_dir if not os.path.exists(fallback_dir): os.makedirs(fallback_dir, exist_ok=True) except Exception: # If unable to create settings directory, use .code_indexer in project or home directory if self.base_path and os.path.exists(self.base_path): fallback_dir = os.path.join(self.base_path, ".code_indexer", hashlib.md5(self.base_path.encode()).hexdigest()) else: fallback_dir = os.path.join(os.path.expanduser("~"), ".code_indexer", "default" if not self.base_path else hashlib.md5(self.base_path.encode()).hexdigest()) self.settings_path = fallback_dir if not os.path.exists(fallback_dir): os.makedirs(fallback_dir, exist_ok=True) def get_config_path(self): """Get the path to the configuration file""" try: path = os.path.join(self.settings_path, CONFIG_FILE) # Ensure directory exists os.makedirs(os.path.dirname(path), exist_ok=True) return path except Exception: # If error occurs, use file in project or home directory as fallback if self.base_path and os.path.exists(self.base_path): return os.path.join(self.base_path, CONFIG_FILE) else: return os.path.join(os.path.expanduser("~"), CONFIG_FILE) def _get_timestamp(self): """Get current timestamp""" return datetime.now().isoformat() def save_config(self, config): """Save configuration data Args: config (dict): Configuration data """ try: config_path = self.get_config_path() # Add timestamp config['last_updated'] = self._get_timestamp() # Ensure directory exists os.makedirs(os.path.dirname(config_path), exist_ok=True) with open(config_path, 'w', encoding='utf-8') as f: json.dump(config, f, indent=2, ensure_ascii=False) return config except Exception: return config def load_config(self): """Load configuration data Returns: dict: Configuration data, or empty dict if file doesn't exist """ # If skip_load is set, return empty dict directly if self.skip_load: return {} try: config_path = self.get_config_path() if os.path.exists(config_path): try: with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) return config except (json.JSONDecodeError, UnicodeDecodeError): # If file is corrupted, return empty dict return {} else: pass return {} except Exception: return {} def save_index(self, index_data): """Save code index in JSON format Args: index_data: Index data as dictionary or JSON string """ try: index_path = self.get_index_path() # Ensure directory exists dir_path = os.path.dirname(index_path) if not os.path.exists(dir_path): os.makedirs(dir_path, exist_ok=True) # Check if directory is writable if not os.access(dir_path, os.W_OK): # Use project or home directory as fallback if self.base_path and os.path.exists(self.base_path): index_path = os.path.join(self.base_path, INDEX_FILE) else: index_path = os.path.join(os.path.expanduser("~"), INDEX_FILE) # Convert to JSON string if it's an object with to_json method if hasattr(index_data, 'to_json'): json_data = index_data.to_json() elif isinstance(index_data, str): json_data = index_data else: # Assume it's a dictionary and convert to JSON json_data = json.dumps(index_data, indent=2, default=str) with open(index_path, 'w', encoding='utf-8') as f: f.write(json_data) except Exception: # Try saving to project or home directory try: if self.base_path and os.path.exists(self.base_path): fallback_path = os.path.join(self.base_path, INDEX_FILE) else: fallback_path = os.path.join(os.path.expanduser("~"), INDEX_FILE) # Convert to JSON string if it's an object with to_json method if hasattr(index_data, 'to_json'): json_data = index_data.to_json() elif isinstance(index_data, str): json_data = index_data else: json_data = json.dumps(index_data, indent=2, default=str) with open(fallback_path, 'w', encoding='utf-8') as f: f.write(json_data) except Exception: pass def load_index(self): """Load code index from JSON format Returns: dict: Index data, or None if file doesn't exist or has errors """ # If skip_load is set, return None directly if self.skip_load: return None try: index_path = self.get_index_path() if os.path.exists(index_path): try: with open(index_path, 'r', encoding='utf-8') as f: index_data = json.load(f) return index_data except (json.JSONDecodeError, UnicodeDecodeError): # If file is corrupted, return None return None except Exception: return None else: # Try loading from project or home directory if self.base_path and os.path.exists(self.base_path): fallback_path = os.path.join(self.base_path, INDEX_FILE) else: fallback_path = os.path.join(os.path.expanduser("~"), INDEX_FILE) if os.path.exists(fallback_path): try: with open(fallback_path, 'r', encoding='utf-8') as f: index_data = json.load(f) return index_data except Exception: pass return None except Exception: return None def cleanup_legacy_files(self) -> None: """Clean up any legacy index files found.""" try: legacy_files = [ os.path.join(self.settings_path, "file_index.pickle"), os.path.join(self.settings_path, "content_cache.pickle"), os.path.join(self.settings_path, INDEX_FILE) # Legacy JSON ] for legacy_file in legacy_files: if os.path.exists(legacy_file): try: os.remove(legacy_file) except Exception: pass except Exception: pass def clear(self): """Clear config and index files""" try: if os.path.exists(self.settings_path): # Check if directory is writable if not os.access(self.settings_path, os.W_OK): return # Delete specific files only (config.json and index.json) files_to_delete = [CONFIG_FILE, INDEX_FILE] for filename in files_to_delete: file_path = os.path.join(self.settings_path, filename) try: if os.path.isfile(file_path): os.unlink(file_path) except Exception: pass else: pass except Exception: pass def get_stats(self): """Get statistics for the settings directory Returns: dict: Dictionary containing file sizes and update times """ try: stats = { 'settings_path': self.settings_path, 'exists': os.path.exists(self.settings_path), 'is_directory': os.path.isdir(self.settings_path) if os.path.exists(self.settings_path) else False, 'writable': os.access(self.settings_path, os.W_OK) if os.path.exists(self.settings_path) else False, 'files': {}, 'temp_dir': tempfile.gettempdir(), 'base_path': self.base_path } if stats['exists'] and stats['is_directory']: try: # Get all files in the directory all_files = os.listdir(self.settings_path) stats['all_files'] = all_files # Get details for specific files for filename in [CONFIG_FILE, INDEX_FILE]: file_path = os.path.join(self.settings_path, filename) if os.path.exists(file_path): try: file_stats = os.stat(file_path) stats['files'][filename] = { 'path': file_path, 'size_bytes': file_stats.st_size, 'last_modified': datetime.fromtimestamp(file_stats.st_mtime).isoformat(), 'readable': os.access(file_path, os.R_OK), 'writable': os.access(file_path, os.W_OK) } except Exception as e: stats['files'][filename] = { 'path': file_path, 'error': str(e) } except Exception as e: stats['list_error'] = str(e) # Check fallback path if self.base_path and os.path.exists(self.base_path): fallback_dir = os.path.join(self.base_path, ".code_indexer") else: fallback_dir = os.path.join(os.path.expanduser("~"), ".code_indexer") stats['fallback_path'] = fallback_dir stats['fallback_exists'] = os.path.exists(fallback_dir) stats['fallback_is_directory'] = os.path.isdir(fallback_dir) if os.path.exists(fallback_dir) else False return stats except Exception as e: return { 'error': str(e), 'settings_path': self.settings_path, 'temp_dir': tempfile.gettempdir(), 'base_path': self.base_path } def get_search_tools_config(self): """Get the configuration of available search tools. Returns: dict: A dictionary containing the list of available tool names. """ return { "available_tools": [s.name for s in self.available_strategies], "preferred_tool": self.get_preferred_search_tool().name if self.available_strategies else None } def get_preferred_search_tool(self) -> SearchStrategy | None: """Get the preferred search tool based on availability and priority. Returns: SearchStrategy: An instance of the preferred search strategy, or None. """ if not self.available_strategies: self.refresh_available_strategies() return self.available_strategies[0] if self.available_strategies else None def refresh_available_strategies(self): """ Force a refresh of the available search tools list. """ self.available_strategies = _get_available_strategies() def get_file_watcher_config(self) -> dict: """ Get file watcher specific configuration. Returns: dict: File watcher configuration with defaults """ config = self.load_config() default_config = { "enabled": True, "debounce_seconds": 6.0, "additional_exclude_patterns": [], "monitored_extensions": [], # Empty = use all supported extensions "exclude_patterns": [ ".git", ".svn", ".hg", "node_modules", "__pycache__", ".venv", "venv", ".DS_Store", "Thumbs.db", "dist", "build", "target", ".idea", ".vscode", ".pytest_cache", ".coverage", ".tox", "bin", "obj" ] } # Merge with loaded config file_watcher_config = config.get("file_watcher", {}) for key, default_value in default_config.items(): if key not in file_watcher_config: file_watcher_config[key] = default_value return file_watcher_config def update_file_watcher_config(self, updates: dict) -> None: """ Update file watcher configuration. Args: updates: Dictionary of configuration updates """ config = self.load_config() if "file_watcher" not in config: config["file_watcher"] = self.get_file_watcher_config() config["file_watcher"].update(updates) self.save_config(config)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/johnhuang316/code-index-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server