code-index-mcp
by johnhuang316
- code-index-mcp
- src
- code_index_mcp
"""
Code Index MCP Server
This MCP server allows LLMs to index, search, and analyze code from a project directory.
It provides tools for file discovery, content retrieval, and code analysis.
"""
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import AsyncIterator, Dict, List, Optional, Tuple, Any
import os
import pathlib
import json
import fnmatch
import sys
from mcp.server.fastmcp import FastMCP, Context, Image
from mcp import types
# Import the ProjectSettings class - using relative import
from .project_settings import ProjectSettings
# Create the MCP server
mcp = FastMCP("CodeIndexer", dependencies=["pathlib"])
# In-memory references (will be loaded from persistent storage)
file_index = {}
code_content_cache = {}
supported_extensions = [
'.py', '.js', '.ts', '.jsx', '.tsx', '.java', '.c', '.cpp', '.h', '.hpp',
'.cs', '.go', '.rb', '.php', '.swift', '.kt', '.rs', '.scala', '.sh',
'.bash', '.html', '.css', '.scss', '.md', '.json', '.xml', '.yml', '.yaml'
]
@dataclass
class CodeIndexerContext:
"""Context for the Code Indexer MCP server."""
base_path: str
settings: ProjectSettings
file_count: int = 0
@asynccontextmanager
async def indexer_lifespan(server: FastMCP) -> AsyncIterator[CodeIndexerContext]:
"""Manage the lifecycle of the Code Indexer MCP server."""
# We will not set a default base_path
# The user must explicitly set the project path before using the system
base_path = "" # Empty string to indicate no path is set
# Initialize the settings manager with a temporary path
# This will be properly set when the user calls set_project_path
settings = ProjectSettings(base_path or os.getcwd())
# Initialize the context
context = CodeIndexerContext(
base_path=base_path,
settings=settings
)
# Try to load existing index and cache
global file_index, code_content_cache
loaded_index = settings.load_index()
if loaded_index:
file_index = loaded_index
context.file_count = _count_files(file_index)
loaded_cache = settings.load_cache()
if loaded_cache:
code_content_cache = loaded_cache
try:
# Yield the context to the server
yield context
finally:
# Save index and cache on shutdown
if file_index:
settings.save_index(file_index)
if code_content_cache:
settings.save_cache(code_content_cache)
# Initialize the server with our lifespan manager
mcp = FastMCP("CodeIndexer", lifespan=indexer_lifespan)
# ----- RESOURCES -----
@mcp.resource("config://code-indexer")
def get_config() -> str:
"""Get the current configuration of the Code Indexer."""
ctx = mcp.get_context()
# Get the base path from context
base_path = ctx.request_context.lifespan_context.base_path
# Check if base_path is set
if not base_path:
return json.dumps({
"status": "not_configured",
"message": "Project path not set. Please use set_project_path to set a project directory first.",
"supported_extensions": supported_extensions
}, indent=2)
# Get file count
file_count = ctx.request_context.lifespan_context.file_count
# Get settings stats
settings = ctx.request_context.lifespan_context.settings
settings_stats = settings.get_stats()
config = {
"base_path": base_path,
"supported_extensions": supported_extensions,
"file_count": file_count,
"settings_directory": settings.settings_path,
"settings_stats": settings_stats
}
return json.dumps(config, indent=2)
@mcp.resource("files://{file_path}")
def get_file_content(file_path: str) -> str:
"""Get the content of a specific file."""
ctx = mcp.get_context()
# Get the base path from context
base_path = ctx.request_context.lifespan_context.base_path
# Check if base_path is set
if not base_path:
return "Error: Project path not set. Please use set_project_path to set a project directory first."
# Handle absolute paths (especially Windows paths starting with drive letters)
if os.path.isabs(file_path) or (len(file_path) > 1 and file_path[1] == ':'):
# Absolute paths are not allowed via this endpoint
return f"Error: Absolute file paths like '{file_path}' are not allowed. Please use paths relative to the project root."
# Normalize the file path
norm_path = os.path.normpath(file_path)
# Check for path traversal attempts
if "..\\" in norm_path or "../" in norm_path or norm_path.startswith(".."):
return f"Error: Invalid file path: {file_path} (directory traversal not allowed)"
# Construct the full path and verify it's within the project bounds
full_path = os.path.join(base_path, norm_path)
real_full_path = os.path.realpath(full_path)
real_base_path = os.path.realpath(base_path)
if not real_full_path.startswith(real_base_path):
return f"Error: Access denied. File path must be within project directory."
try:
with open(full_path, 'r', encoding='utf-8') as f:
content = f.read()
# Cache the content for faster retrieval later
code_content_cache[norm_path] = content
return content
except UnicodeDecodeError:
return f"Error: File {file_path} appears to be a binary file or uses unsupported encoding."
except Exception as e:
return f"Error reading file: {e}"
@mcp.resource("structure://project")
def get_project_structure() -> str:
"""Get the structure of the project as a JSON tree."""
ctx = mcp.get_context()
# Get the base path from context
base_path = ctx.request_context.lifespan_context.base_path
# Check if base_path is set
if not base_path:
return json.dumps({
"status": "not_configured",
"message": "Project path not set. Please use set_project_path to set a project directory first."
}, indent=2)
# Check if we need to refresh the index
if not file_index:
_index_project(base_path)
# Update file count in context
ctx.request_context.lifespan_context.file_count = _count_files(file_index)
# Save updated index
ctx.request_context.lifespan_context.settings.save_index(file_index)
return json.dumps(file_index, indent=2)
@mcp.resource("settings://stats")
def get_settings_stats() -> str:
"""Get statistics about the settings directory and files."""
ctx = mcp.get_context()
# Get settings manager from context
settings = ctx.request_context.lifespan_context.settings
# Get settings stats
stats = settings.get_stats()
return json.dumps(stats, indent=2)
# ----- TOOLS -----
@mcp.tool()
def set_project_path(path: str, ctx: Context) -> str:
"""Set the base project path for indexing."""
# Validate and normalize the path
try:
norm_path = os.path.normpath(path)
abs_path = os.path.abspath(norm_path)
if not os.path.exists(abs_path):
return f"Error: Path does not exist: {abs_path}"
if not os.path.isdir(abs_path):
return f"Error: Path is not a directory: {abs_path}"
# Clear existing in-memory index and cache
global file_index, code_content_cache
file_index.clear()
code_content_cache.clear()
# Update the base path in context
ctx.request_context.lifespan_context.base_path = abs_path
# Create a new settings manager for the new path
ctx.request_context.lifespan_context.settings = ProjectSettings(abs_path)
# Ensure .code_indexer is added to project's .gitignore
gitignore_path = os.path.join(abs_path, ".gitignore")
try:
# Check if .gitignore exists
if os.path.exists(gitignore_path):
# Read existing content
with open(gitignore_path, 'r', encoding='utf-8') as f:
content = f.read()
# Check if .code_indexer is already in .gitignore
if ".code_indexer/" not in content and ".code_indexer" not in content:
# Append to .gitignore
with open(gitignore_path, 'a', encoding='utf-8') as f:
f.write("\n# Code Index MCP cache directory\n.code_indexer/\n")
ctx.info(f"Added .code_indexer/ to project's .gitignore file.")
else:
# Create new .gitignore
with open(gitignore_path, 'w', encoding='utf-8') as f:
f.write("# Code Index MCP cache directory\n.code_indexer/\n")
ctx.info(f"Created .gitignore file with .code_indexer/ entry.")
except Exception as gitignore_error:
ctx.info(f"Note: Could not update .gitignore file: {gitignore_error}")
# Try to load existing index and cache
loaded_index = ctx.request_context.lifespan_context.settings.load_index()
if loaded_index:
file_index = loaded_index
file_count = _count_files(file_index)
ctx.request_context.lifespan_context.file_count = file_count
return f"Project path set to: {abs_path}. Loaded existing index with {file_count} files."
# If no existing index, create a new one
file_count = _index_project(abs_path)
ctx.request_context.lifespan_context.file_count = file_count
# Save the new index
ctx.request_context.lifespan_context.settings.save_index(file_index)
# Save project config
config = {
"base_path": abs_path,
"supported_extensions": supported_extensions,
"last_indexed": ctx.request_context.lifespan_context.settings.load_config().get('last_indexed', None)
}
ctx.request_context.lifespan_context.settings.save_config(config)
return f"Project path set to: {abs_path}. Indexed {file_count} files."
except Exception as e:
return f"Error setting project path: {e}"
@mcp.tool()
def search_code(query: str, ctx: Context, extensions: Optional[List[str]] = None, case_sensitive: bool = False) -> Dict[str, List[Tuple[int, str]]]:
"""
Search for code matches within the indexed files.
Returns a dictionary mapping filenames to lists of (line_number, line_content) tuples.
"""
base_path = ctx.request_context.lifespan_context.base_path
# Check if base_path is set
if not base_path:
return {"error": "Project path not set. Please use set_project_path to set a project directory first."}
# Check if we need to index the project
if not file_index:
_index_project(base_path)
ctx.request_context.lifespan_context.file_count = _count_files(file_index)
ctx.request_context.lifespan_context.settings.save_index(file_index)
results = {}
# Filter by extensions if provided
if extensions:
valid_extensions = [ext if ext.startswith('.') else f'.{ext}' for ext in extensions]
else:
valid_extensions = supported_extensions
# Process the search
for file_path, _info in _get_all_files(file_index):
# Check if the file has a supported extension
if not any(file_path.endswith(ext) for ext in valid_extensions):
continue
try:
# Get file content (from cache if available)
if file_path in code_content_cache:
content = code_content_cache[file_path]
else:
full_path = os.path.join(base_path, file_path)
with open(full_path, 'r', encoding='utf-8') as f:
content = f.read()
code_content_cache[file_path] = content
# Search for matches
matches = []
for i, line in enumerate(content.splitlines(), 1):
if (case_sensitive and query in line) or (not case_sensitive and query.lower() in line.lower()):
matches.append((i, line.strip()))
if matches:
results[file_path] = matches
except Exception as e:
ctx.info(f"Error searching file {file_path}: {e}")
# Save the updated cache
ctx.request_context.lifespan_context.settings.save_cache(code_content_cache)
return results
@mcp.tool()
def find_files(pattern: str, ctx: Context) -> List[str]:
"""
Find files in the project that match the given pattern.
Supports glob patterns like *.py or **/*.js.
"""
base_path = ctx.request_context.lifespan_context.base_path
# Check if base_path is set
if not base_path:
return ["Error: Project path not set. Please use set_project_path to set a project directory first."]
# Check if we need to index the project
if not file_index:
_index_project(base_path)
ctx.request_context.lifespan_context.file_count = _count_files(file_index)
ctx.request_context.lifespan_context.settings.save_index(file_index)
matching_files = []
for file_path, _info in _get_all_files(file_index):
if fnmatch.fnmatch(file_path, pattern):
matching_files.append(file_path)
return matching_files
@mcp.tool()
def get_file_summary(file_path: str, ctx: Context) -> Dict[str, Any]:
"""
Get a summary of a specific file, including:
- Line count
- Function/class definitions (for supported languages)
- Import statements
- Basic complexity metrics
"""
base_path = ctx.request_context.lifespan_context.base_path
# Check if base_path is set
if not base_path:
return {"error": "Project path not set. Please use set_project_path to set a project directory first."}
# Normalize the file path
norm_path = os.path.normpath(file_path)
if norm_path.startswith('..'):
return {"error": f"Invalid file path: {file_path}"}
full_path = os.path.join(base_path, norm_path)
try:
# Get file content
if norm_path in code_content_cache:
content = code_content_cache[norm_path]
else:
with open(full_path, 'r', encoding='utf-8') as f:
content = f.read()
code_content_cache[norm_path] = content
# Save the updated cache
ctx.request_context.lifespan_context.settings.save_cache(code_content_cache)
# Basic file info
lines = content.splitlines()
line_count = len(lines)
# File extension for language-specific analysis
_, ext = os.path.splitext(norm_path)
summary = {
"file_path": norm_path,
"line_count": line_count,
"size_bytes": os.path.getsize(full_path),
"extension": ext,
}
# Language-specific analysis
if ext == '.py':
# Python analysis
imports = []
classes = []
functions = []
for i, line in enumerate(lines):
line = line.strip()
# Check for imports
if line.startswith('import ') or line.startswith('from '):
imports.append(line)
# Check for class definitions
if line.startswith('class '):
classes.append({
"line": i + 1,
"name": line.replace('class ', '').split('(')[0].split(':')[0].strip()
})
# Check for function definitions
if line.startswith('def '):
functions.append({
"line": i + 1,
"name": line.replace('def ', '').split('(')[0].strip()
})
summary.update({
"imports": imports,
"classes": classes,
"functions": functions,
"import_count": len(imports),
"class_count": len(classes),
"function_count": len(functions),
})
elif ext in ['.js', '.jsx', '.ts', '.tsx']:
# JavaScript/TypeScript analysis
imports = []
classes = []
functions = []
for i, line in enumerate(lines):
line = line.strip()
# Check for imports
if line.startswith('import ') or line.startswith('require('):
imports.append(line)
# Check for class definitions
if line.startswith('class ') or 'class ' in line:
class_name = ""
if 'class ' in line:
parts = line.split('class ')[1]
class_name = parts.split(' ')[0].split('{')[0].split('extends')[0].strip()
classes.append({
"line": i + 1,
"name": class_name
})
# Check for function definitions
if 'function ' in line or '=>' in line:
functions.append({
"line": i + 1,
"content": line
})
summary.update({
"imports": imports,
"classes": classes,
"functions": functions,
"import_count": len(imports),
"class_count": len(classes),
"function_count": len(functions),
})
return summary
except Exception as e:
return {"error": f"Error analyzing file: {e}"}
@mcp.tool()
def refresh_index(ctx: Context) -> str:
"""Refresh the project index."""
base_path = ctx.request_context.lifespan_context.base_path
# Check if base_path is set
if not base_path:
return "Error: Project path not set. Please use set_project_path to set a project directory first."
# Clear existing index
global file_index
file_index.clear()
# Re-index the project
file_count = _index_project(base_path)
ctx.request_context.lifespan_context.file_count = file_count
# Save the updated index
ctx.request_context.lifespan_context.settings.save_index(file_index)
# Update the last indexed timestamp in config
config = ctx.request_context.lifespan_context.settings.load_config()
ctx.request_context.lifespan_context.settings.save_config({
**config,
'last_indexed': ctx.request_context.lifespan_context.settings._get_timestamp()
})
return f"Project re-indexed. Found {file_count} files."
@mcp.tool()
def get_settings_info(ctx: Context) -> Dict[str, Any]:
"""Get information about the project settings."""
base_path = ctx.request_context.lifespan_context.base_path
# Check if base_path is set
if not base_path:
return {
"status": "not_configured",
"message": "Project path not set. Please use set_project_path to set a project directory first."
}
settings = ctx.request_context.lifespan_context.settings
# Get config
config = settings.load_config()
# Get stats
stats = settings.get_stats()
return {
"settings_directory": settings.settings_path,
"config": config,
"stats": stats,
"exists": os.path.exists(settings.settings_path)
}
@mcp.tool()
def clear_settings(ctx: Context) -> str:
"""Clear all settings and cached data."""
base_path = ctx.request_context.lifespan_context.base_path
# Check if base_path is set
if not base_path:
return "Error: Project path not set. Please use set_project_path to set a project directory first."
settings = ctx.request_context.lifespan_context.settings
# Clear all settings files
settings.clear()
# Clear in-memory cache and index
global file_index, code_content_cache
file_index.clear()
code_content_cache.clear()
return f"All settings and cache cleared from {settings.settings_path}"
# ----- PROMPTS -----
@mcp.prompt()
def analyze_code(file_path: str = "", query: str = "") -> list[types.PromptMessage]:
"""Prompt for analyzing code in the project."""
messages = [
types.PromptMessage(role="user", content=types.TextContent(type="text", text=f"""I need you to analyze some code from my project.
{f'Please analyze the file: {file_path}' if file_path else ''}
{f'I want to understand: {query}' if query else ''}
First, let me give you some context about the project structure. Then, I'll provide the code to analyze.
""")),
types.PromptMessage(role="assistant", content=types.TextContent(type="text", text="I'll help you analyze the code. Let me first examine the project structure to get a better understanding of the codebase."))
]
return messages
@mcp.prompt()
def code_search(query: str = "") -> types.TextContent:
"""Prompt for searching code in the project."""
search_text = f"\"query\"" if not query else f"\"{query}\""
return types.TextContent(type="text", text=f"""I need to search through my codebase for {search_text}.
Please help me find all occurrences of this query and explain what each match means in its context.
Focus on the most relevant files and provide a brief explanation of how each match is used in the code.
If there are too many results, prioritize the most important ones and summarize the patterns you see.""")
@mcp.prompt()
def set_project() -> list[types.PromptMessage]:
"""Prompt for setting the project path."""
messages = [
types.PromptMessage(role="user", content=types.TextContent(type="text", text="""
I need to analyze code from a project, but I haven't set the project path yet. Please help me set up the project path and index the code.
First, I need to specify which project directory to analyze.
""")),
types.PromptMessage(role="assistant", content=types.TextContent(type="text", text="""
Before I can help you analyze any code, we need to set up the project path. This is a required first step.
Please provide the full path to your project folder. For example:
- Windows: "C:/Users/username/projects/my-project"
- macOS/Linux: "/home/username/projects/my-project"
Once you provide the path, I'll use the `set_project_path` tool to configure the code analyzer to work with your project.
"""))
]
return messages
# ----- HELPER FUNCTIONS -----
def _index_project(base_path: str) -> int:
"""
Create an index of the project files.
Returns the number of files indexed.
"""
file_count = 0
file_index.clear()
for root, dirs, files in os.walk(base_path):
# Skip hidden directories and common build/dependency directories
dirs[:] = [d for d in dirs if not d.startswith('.') and
d not in ['node_modules', 'venv', '__pycache__', 'build', 'dist']]
# Create relative path from base_path
rel_path = os.path.relpath(root, base_path)
current_dir = file_index
# Skip the '.' directory (base_path itself)
if rel_path != '.':
# Split the path and navigate/create the tree
path_parts = rel_path.replace('\\', '/').split('/')
for part in path_parts:
if part not in current_dir:
current_dir[part] = {}
current_dir = current_dir[part]
# Add files to current directory
for file in files:
# Skip hidden files and files with unsupported extensions
_, ext = os.path.splitext(file)
if file.startswith('.') or ext not in supported_extensions:
continue
# Store file information
file_path = os.path.join(rel_path, file).replace('\\', '/')
if rel_path == '.':
file_path = file
current_dir[file] = {
"type": "file",
"path": file_path,
"ext": ext
}
file_count += 1
return file_count
def _count_files(directory: Dict) -> int:
"""
Count the number of files in the index.
"""
count = 0
for name, value in directory.items():
if isinstance(value, dict):
if "type" in value and value["type"] == "file":
count += 1
else:
count += _count_files(value)
return count
def _get_all_files(directory: Dict, prefix: str = "") -> List[Tuple[str, Dict]]:
"""
Recursively get all files from the directory structure.
Returns a list of (file_path, file_info) tuples.
"""
result = []
for name, value in directory.items():
if isinstance(value, dict):
if "type" in value and value["type"] == "file":
result.append((value["path"], value))
else:
new_prefix = f"{prefix}/{name}" if prefix else name
result.extend(_get_all_files(value, new_prefix))
return result
def main():
"""Entry point for the code indexer."""
print("Starting Code Index MCP Server...", file=sys.stderr)
mcp.run()
if __name__ == "__main__":
main()