Skip to main content
Glama

Workspace Code Search MCP Server

by LuotoCompany
code_indexer_server.py25.5 kB
#!/usr/bin/env python3 import os import logging import json from typing import List, Set import chromadb from chromadb.config import Settings from chromadb.utils import embedding_functions from tree_sitter_language_pack import get_parser import asyncio from fastmcp import FastMCP from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler # Import LlamaIndex components try: from llama_index.core import Document from llama_index.core.node_parser import CodeSplitter from llama_index.core import SimpleDirectoryReader print("LlamaIndex dependencies found.") except ImportError as e: print(f"Error: {e}") print("Please install the required dependencies:") print("pip install llama-index llama-index-readers-file " "llama-index-embeddings-huggingface") exit(1) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Default directories to ignore DEFAULT_IGNORE_DIRS = { "__pycache__", "node_modules", ".git", "build", "dist", ".venv", "venv", "env", ".pytest_cache", ".ipynb_checkpoints" } # Default files to ignore DEFAULT_IGNORE_FILES = { "package-lock.json", "yarn.lock", "pnpm-lock.yaml", "poetry.lock", "Pipfile.lock", "Gemfile.lock", "composer.lock", ".DS_Store", ".env", ".env.local", ".env.development", ".env.test", ".env.production", "*.pyc", "*.pyo", "*.pyd", "*.so", "*.dll", "*.dylib", ".coverage", "coverage.xml", ".eslintcache", ".tsbuildinfo" } # Default file extensions to include DEFAULT_FILE_EXTENSIONS = { ".py", ".js", ".jsx", ".ts", ".tsx", ".java", ".c", ".cpp", ".h", ".hpp", ".cs", ".go", ".rb", ".php", ".swift", ".kt", ".rs", ".scala", ".sh", ".html", ".css", ".sql", ".md", ".json", ".yaml", ".yml", ".toml" } # Global variables config = None chroma_client = None embedding_function = None mcp = FastMCP(title="Code Indexer Server") observers = [] def sanitize_collection_name(folder_name: str) -> str: """Convert folder name to a valid collection name by replacing forward slashes with underscores.""" return folder_name.replace("/", "_") class CodeIndexerEventHandler(FileSystemEventHandler): def __init__(self, folder_name: str): self.folder_name = folder_name self.collection_name = sanitize_collection_name(folder_name) self.ignore_dirs = set(config["ignore_dirs"]) self.ignore_files = set(config["ignore_files"]) self.file_extensions = set(config["file_extensions"]) def on_created(self, event): if event.is_directory: return if is_valid_file( event.src_path, self.ignore_dirs, self.file_extensions, self.ignore_files ): self._handle_file_change(event.src_path) def on_modified(self, event): if event.is_directory: return if is_valid_file( event.src_path, self.ignore_dirs, self.file_extensions, self.ignore_files ): self._handle_file_change(event.src_path) def on_deleted(self, event): if event.is_directory: return if is_valid_file( event.src_path, self.ignore_dirs, self.file_extensions, self.ignore_files ): self._handle_file_deletion(event.src_path) def _handle_file_change(self, file_path: str): try: # Calculate relative path rel_path = os.path.relpath(file_path, config["projects_root"]) # Load and process the single file reader = SimpleDirectoryReader(input_files=[file_path]) documents = reader.load_data() if documents: # Update metadata with relative path documents[0].metadata["file_path"] = rel_path # Process and index the document process_and_index_documents( documents, self.collection_name, "chroma_db" ) logger.info(f"Indexed updated file: {rel_path}") except Exception as e: logger.error(f"Error processing file {file_path}: {e}") def _handle_file_deletion(self, file_path: str): try: # Calculate relative path rel_path = os.path.relpath(file_path, config["projects_root"]) # Get the collection collection = chroma_client.get_collection( name=self.collection_name, embedding_function=embedding_function ) # Delete all chunks from this file collection.delete( where={"file_path": rel_path} ) logger.info(f"Removed indexed chunks for deleted file: {rel_path}") except Exception as e: logger.error(f"Error removing chunks for {file_path}: {e}") def get_config_from_env(): """Get configuration from environment variables.""" projects_root = os.getenv("PROJECTS_ROOT", "/projects") folders_to_index = os.getenv("FOLDERS_TO_INDEX", "").split(",") folders_to_index = [f.strip() for f in folders_to_index if f.strip()] # Get additional ignore dirs and files from environment additional_ignore_dirs = os.getenv( "ADDITIONAL_IGNORE_DIRS", "" ).split(",") additional_ignore_dirs = [ d.strip() for d in additional_ignore_dirs if d.strip() ] additional_ignore_files = os.getenv( "ADDITIONAL_IGNORE_FILES", "" ).split(",") additional_ignore_files = [ f.strip() for f in additional_ignore_files if f.strip() ] # Combine default and additional ignore patterns ignore_dirs = list(DEFAULT_IGNORE_DIRS | set(additional_ignore_dirs)) ignore_files = list(DEFAULT_IGNORE_FILES | set(additional_ignore_files)) if not folders_to_index: logger.warning("No folders specified to index. Using root directory.") folders_to_index = [""] return { "projects_root": projects_root, "folders_to_index": folders_to_index, "ignore_dirs": ignore_dirs, "ignore_files": ignore_files, "file_extensions": list(DEFAULT_FILE_EXTENSIONS) } async def initialize_chromadb(): """Initialize ChromaDB and embedding function asynchronously.""" global config, chroma_client, embedding_function try: # Get configuration from environment config = get_config_from_env() logger.info("Configuration loaded successfully") # Initialize ChromaDB client with telemetry disabled chroma_client = chromadb.PersistentClient( path="chroma_db", settings=Settings(anonymized_telemetry=False) ) logger.info("ChromaDB client initialized") # Initialize embedding function embedding_function = ( embedding_functions.SentenceTransformerEmbeddingFunction( model_name="all-MiniLM-L6-v2" ) ) logger.info("Embedding function initialized") return True except Exception as e: logger.error(f"Error during initialization: {e}") # Still need to assign default values to global variables if config is None: config = {"projects_root": "", "folders_to_index": [""]} if chroma_client is None: # Create an empty client as a fallback try: chroma_client = chromadb.PersistentClient(path="chroma_db") except Exception as db_err: logger.error( f"Failed to create fallback ChromaDB client: {db_err}" ) if embedding_function is None: try: embedding_function = ( embedding_functions.SentenceTransformerEmbeddingFunction( model_name="all-MiniLM-L6-v2" ) ) except Exception as embed_err: logger.error( f"Failed to create fallback embedding function: " f"{embed_err}" ) return False def is_valid_file( file_path: str, ignore_dirs: Set[str], file_extensions: Set[str], ignore_files: Set[str] = None ) -> bool: """Check if a file should be processed based on its path and extension.""" # Check if path contains ignored directory parts = file_path.split(os.path.sep) for part in parts: if part in ignore_dirs: return False # Get file name and check against ignored files file_name = os.path.basename(file_path) # Use provided ignore_files or fall back to default files_to_ignore = ignore_files if ignore_files is not None else DEFAULT_IGNORE_FILES # Check exact matches if file_name in files_to_ignore: return False # Check wildcard patterns for pattern in files_to_ignore: if pattern.startswith("*"): if file_name.endswith(pattern[1:]): return False # Check file extension _, ext = os.path.splitext(file_path) return ext.lower() in file_extensions if file_extensions else True def load_documents( directory: str, ignore_dirs: Set[str] = DEFAULT_IGNORE_DIRS, file_extensions: Set[str] = DEFAULT_FILE_EXTENSIONS, ignore_files: Set[str] = None ) -> List[Document]: """Load documents from a directory, filtering out ignored paths.""" try: # Get all files recursively all_files = [] for root, dirs, files in os.walk(directory): # Skip ignored directories dirs[:] = [ d for d in dirs if d not in ignore_dirs and not d.startswith('.') ] for file in files: abs_file_path = os.path.join(root, file) if is_valid_file( abs_file_path, ignore_dirs, file_extensions, ignore_files ): # Calculate relative path from the directory being indexed rel_file_path = os.path.relpath(abs_file_path, directory) all_files.append((abs_file_path, rel_file_path)) if not all_files: logger.warning(f"No valid files found in {directory}") return [] # Load the filtered files using absolute paths for reading reader = SimpleDirectoryReader( input_files=[abs_path for abs_path, _ in all_files], exclude_hidden=True ) documents = reader.load_data() # Update the metadata to use relative paths for doc, (_, rel_path) in zip(documents, all_files): doc.metadata["file_path"] = rel_path logger.info(f"Loaded {len(documents)} documents from {directory}") return documents except Exception as e: logger.error(f"Error loading documents: {e}") return [] def process_and_index_documents( documents: List[Document], collection_name: str, persist_directory: str ) -> None: """Process documents with CodeSplitter and index them in ChromaDB.""" if not documents: logger.warning("No documents to process.") return try: # Try to get collection if it exists or create a new one collection = chroma_client.get_or_create_collection( name=collection_name, embedding_function=embedding_function, metadata={"hnsw:space": "cosine"} ) except Exception as e: logger.error(f"Error creating collection: {e}") return # Process each document total_nodes = 0 for doc in documents: try: # Extract file path from metadata file_path = doc.metadata.get("file_path", "unknown") file_name = os.path.basename(file_path) # Determine language from file extension _, ext = os.path.splitext(file_name) language = ext[1:] if ext else "text" # Remove the dot # Handle Markdown and other text files differently code_file_extensions = [ "py", "python", "js", "jsx", "ts", "tsx", "java", "c", "cpp", "h", "hpp", "cs", "go", "rb", "php", "swift", "kt", "rs", "scala" ] if language in code_file_extensions: # Determine parser language based on file extension parser_language = "python" # Default fallback if language in ["py", "python"]: parser_language = "python" elif language in ["js", "jsx", "ts", "tsx"]: parser_language = "javascript" elif language in ["java"]: parser_language = "java" elif language in ["c", "cpp", "h", "hpp"]: parser_language = "cpp" elif language in ["cs"]: parser_language = "csharp" elif language in ["go"]: parser_language = "go" elif language in ["rb"]: parser_language = "ruby" elif language in ["php"]: parser_language = "php" elif language in ["swift"]: parser_language = "swift" elif language in ["kt"]: parser_language = "kotlin" elif language in ["rs"]: parser_language = "rust" elif language in ["scala"]: parser_language = "scala" # Create parser and splitter for this specific language try: code_parser = get_parser(parser_language) splitter = CodeSplitter( language=parser_language, chunk_lines=40, chunk_lines_overlap=15, max_chars=1500, parser=code_parser ) nodes = splitter.get_nodes_from_documents([doc]) except Exception as e: logger.warning( f"Could not create parser for {parser_language}, " f"falling back to text-based splitting: {e}" ) # Fall back to text-based splitting nodes = [] lines = doc.text.split("\n") chunk_size = 40 overlap = 15 for i in range(0, len(lines), chunk_size - overlap): start_idx = i end_idx = min(i + chunk_size, len(lines)) if start_idx >= len(lines): continue chunk_text = "\n".join(lines[start_idx:end_idx]) if not chunk_text.strip(): continue from llama_index.core.schema import TextNode node = TextNode( text=chunk_text, metadata={ "start_line_number": start_idx + 1, "end_line_number": end_idx, "file_path": file_path, "file_name": file_name, } ) nodes.append(node) else: # For non-code files, manually split by lines nodes = [] lines = doc.text.split("\n") chunk_size = 40 overlap = 15 for i in range(0, len(lines), chunk_size - overlap): start_idx = i end_idx = min(i + chunk_size, len(lines)) if start_idx >= len(lines): continue chunk_text = "\n".join(lines[start_idx:end_idx]) if not chunk_text.strip(): continue from llama_index.core.schema import TextNode node = TextNode( text=chunk_text, metadata={ "start_line_number": start_idx + 1, "end_line_number": end_idx, "file_path": file_path, "file_name": file_name, } ) nodes.append(node) if not nodes: logger.warning(f"No nodes generated for {file_path}") continue logger.info(f"Processing {file_path}: {len(nodes)} chunks") # Prepare data for ChromaDB ids = [] texts = [] metadatas = [] for i, node in enumerate(nodes): start_line = node.metadata.get("start_line_number", 0) end_line = node.metadata.get("end_line_number", 0) if start_line == 0 or end_line == 0: start_line = 1 end_line = len(node.text.split("\n")) chunk_id = f"{file_path}_{start_line}_{end_line}_{i}" metadata = { "file_path": file_path, "file_name": file_name, "language": language, "start_line": start_line, "end_line": end_line, } ids.append(chunk_id) texts.append(node.text) metadatas.append(metadata) # Add nodes to ChromaDB collection collection.upsert( ids=ids, documents=texts, metadatas=metadatas ) total_nodes += len(nodes) except Exception as e: logger.error( f"Error processing document " f"{doc.metadata.get('file_path', 'unknown')}: {e}" ) logger.info( f"Successfully indexed {total_nodes} code chunks " f"across {len(documents)} files" ) async def perform_initial_indexing(folder: str) -> bool: """Check if collection exists and perform initial indexing if needed.""" try: folder_path = os.path.join(config["projects_root"], folder) if not os.path.exists(folder_path): logger.error(f"Folder not found: {folder_path}") return False collection_name = sanitize_collection_name(folder) # Check if collection exists try: chroma_client.get_collection( name=collection_name, embedding_function=embedding_function ) logger.info(f"Collection {collection_name} already exists, skipping initial indexing") return True except Exception: logger.info(f"Collection {collection_name} not found, performing initial indexing") # Load and process all documents in the folder documents = load_documents( folder_path, ignore_dirs=set(config["ignore_dirs"]), file_extensions=set(config["file_extensions"]), ignore_files=set(config["ignore_files"]) ) if documents: process_and_index_documents(documents, collection_name, "chroma_db") logger.info(f"Successfully performed initial indexing for {folder}") return True else: logger.warning(f"No documents found to index in {folder}") return False except Exception as e: logger.error(f"Error during initial indexing of {folder}: {e}") return False async def index_projects(): """Set up file system watchers for all configured projects.""" global observers try: for folder in config["folders_to_index"]: # First perform initial indexing if needed success = await perform_initial_indexing(folder) if not success: logger.error(f"Failed to perform initial indexing for {folder}") continue folder_path = os.path.join(config["projects_root"], folder) logger.info(f"Setting up file watcher for {folder}") # Create an observer and event handler for this folder observer = Observer() event_handler = CodeIndexerEventHandler(folder) observer.schedule(event_handler, folder_path, recursive=True) # Start the observer observer.start() observers.append(observer) logger.info(f"Started watching {folder}") # Keep the main thread alive while True: await asyncio.sleep(1) except Exception as e: logger.error(f"Error in file watching setup: {e}") # Clean up observers on error for observer in observers: observer.stop() observers.clear() @mcp.tool( name="search_code", description="""Search code using natural language queries. Args: query: Natural language query about the codebase project: Collection/folder name to search in. Use the current workspace name. n_results: Number of results to return (default: 5) threshold: Minimum relevance percentage to include results (default: 30.0) """ ) async def search_code( query: str, project: str, n_results: int = 5, threshold: float = 30.0 ) -> str: try: if not chroma_client or not embedding_function: logger.error("ChromaDB client or embedding function not initialized") return json.dumps({ "error": "Search system not properly initialized", "results": [], "total_results": 0 }) # Get all collections collection_names = chroma_client.list_collections() # Find matching collections matching_collections = [] project_name = project.lower() for collection_name in collection_names: # The collection name might be in format "customerX_project1" or just "project1" # We want to match if project_name fully matches the part after the last _ (if any) collection_parts = collection_name.lower().split('_') if collection_parts[-1] == project_name: matching_collections.append(collection_name) if not matching_collections: logger.error(f"No collections found matching project {project}") return json.dumps({ "error": f"No collections found matching project {project}", "results": [], "total_results": 0 }) # Search in all matching collections and combine results all_results = [] for collection_name in matching_collections: collection = chroma_client.get_collection(collection_name) results = collection.query( query_texts=[query], n_results=n_results, include=["documents", "metadatas", "distances"] ) if results["documents"] and results["documents"][0]: for doc, meta, distance in zip( results["documents"][0], results["metadatas"][0], results["distances"][0] ): similarity = (1 - distance) * 100 if similarity >= threshold: all_results.append({ "text": doc, "file_path": meta.get("file_path", "Unknown file"), "language": meta.get("language", "text"), "start_line": int(meta.get("start_line", 0)), "end_line": int(meta.get("end_line", 0)), "relevance": round(similarity, 1), "collection": collection.name # Add collection name for debugging }) # Sort results by relevance all_results.sort(key=lambda x: x["relevance"], reverse=True) # Take top n_results final_results = all_results[:n_results] return json.dumps({ "results": final_results, "total_results": len(final_results) }) except Exception as e: logger.error(f"Error in search_code: {str(e)}") return json.dumps({ "error": str(e), "results": [], "total_results": 0 }) # Run initialization before starting MCP async def main(): # Initialize ChromaDB before starting MCP success = await initialize_chromadb() if success: # Start file watching in background asyncio.create_task(index_projects()) logger.info("File watching task started") await mcp.run_sse_async() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/LuotoCompany/cursor-local-indexing'

If you have feedback or need assistance with the MCP directory API, please join our Discord server