Code Analysis MCP Server
from pathlib import Path
from typing import Optional, List, Dict, Union, Set
from dataclasses import dataclass
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp.prompts.base import UserMessage, AssistantMessage
import os
from pathspec import PathSpec
from pathspec.patterns import GitWildMatchPattern
@dataclass
class Summary:
file_count: int = 0
dir_count: int = 0
total_size: int = 0
@dataclass
class FileStructure:
path: str
type: str
size: Optional[int] = None
children: Optional[List['FileStructure']] = None
summary: Optional[Summary] = None
class RepoStructureAnalyzer:
def __init__(self, repo_path: Path, max_depth: int = 3, max_children: int = 100):
self.repo_path = repo_path
self.MAX_DEPTH = max_depth
self.MAX_CHILDREN = max_children
# Default patterns to always ignore
self.default_ignore_patterns = ['.git', '__pycache__', 'node_modules']
# Load .gitignore if it exists
self.gitignore_spec = self._load_gitignore()
def _load_gitignore(self) -> Optional[PathSpec]:
"""Load .gitignore patterns if the file exists."""
gitignore_path = self.repo_path / '.gitignore'
patterns = []
# Add default patterns
for pattern in self.default_ignore_patterns:
patterns.append(GitWildMatchPattern(pattern))
# Add patterns from .gitignore if it exists
if gitignore_path.exists() and gitignore_path.is_file():
try:
with open(gitignore_path, 'r') as f:
for line in f:
line = line.strip()
# Skip empty lines and comments
if line and not line.startswith('#'):
patterns.append(GitWildMatchPattern(line))
except Exception as e:
print(f"Error reading .gitignore: {e}")
return PathSpec(patterns)
def _is_safe_path(self, path: Path) -> bool:
"""Check if the given path is safe (no directory traversal)."""
try:
return path.resolve().is_relative_to(self.repo_path.resolve())
except ValueError:
return False
def format_structure(self, structure: FileStructure) -> str:
"""Format the file structure into a readable string."""
output = []
def format_size(size: int) -> str:
units = ['B', 'KB', 'MB', 'GB']
value = float(size)
index = 0
while value >= 1024 and index < len(units) - 1:
value /= 1024
index += 1
return f"{value:.1f} {units[index]}"
def format_item(item: FileStructure, level: int = 0) -> None:
indent = ' ' * level
if item.type == 'directory':
output.append(f"{indent}📁 {item.path}/")
if item.summary:
summary = item.summary
output.append(
f"{indent} Contains: {summary.file_count} files, "
f"{summary.dir_count} directories, "
f"{format_size(summary.total_size)}"
)
elif item.children:
for child in item.children:
format_item(child, level + 1)
else:
output.append(f"{indent}📄 {item.path} ({format_size(item.size or 0)})")
format_item(structure)
return '\n'.join(output)
def should_ignore(self, path: str) -> bool:
"""Check if path should be ignored based on gitignore patterns."""
return self.gitignore_spec.match_file(path)
def get_structure(
self,
current_path: Path,
relative_path: str = '',
current_depth: int = 0,
max_depth: Optional[int] = None
) -> FileStructure:
"""Recursively get the structure of files and directories."""
if max_depth is None:
max_depth = self.MAX_DEPTH
# Verify path safety
if not self._is_safe_path(current_path):
raise ValueError(f"Invalid path: {current_path}")
# Skip symbolic links
if current_path.is_symlink():
raise ValueError(f"Symbolic links are not supported: {current_path}")
stats = current_path.stat()
rel_path = relative_path or current_path.name
if relative_path and self.should_ignore(relative_path):
raise ValueError(f"Path {relative_path} is ignored")
if current_path.is_file():
return FileStructure(
path=rel_path,
type="file",
size=stats.st_size
)
if current_path.is_dir():
children = []
summary = Summary()
if current_depth < max_depth:
try:
entries = list(current_path.iterdir())
for entry in entries:
if len(children) >= self.MAX_CHILDREN:
if entry.is_file() and not entry.is_symlink():
summary.file_count += 1
summary.total_size += entry.stat().st_size
elif entry.is_dir() and not entry.is_symlink():
summary.dir_count += 1
continue
entry_relative_path = str(entry.relative_to(self.repo_path))
try:
if self.should_ignore(entry_relative_path):
continue
if entry.is_symlink():
continue
entry_stats = entry.stat()
if entry.is_file():
summary.file_count += 1
summary.total_size += entry_stats.st_size
elif entry.is_dir():
summary.dir_count += 1
child = self.get_structure(
entry,
entry_relative_path,
current_depth + 1,
max_depth
)
children.append(child)
except Exception as error:
print(f"Error processing {entry}: {error}")
continue
except Exception as error:
print(f"Error reading directory {current_path}: {error}")
return FileStructure(
path=rel_path,
type="directory",
children=children if children else None,
summary=summary if current_depth >= max_depth or len(children) >= self.MAX_CHILDREN else None
)
raise ValueError(f"Unsupported file type at {current_path}")
class FileReader:
def __init__(self, repo_path: Path):
self.repo_path = repo_path
self.MAX_SIZE = 1024 * 1024 # 1MB
self.MAX_LINES = 1000 # Maximum number of lines to return
def _detect_language(self, file_path: str) -> str:
"""Detect the programming language based on file extension."""
ext = Path(file_path).suffix.lower()
# Extensive mapping of file extensions to languages
language_map = {
# Programming Languages
'.py': 'python',
'.js': 'javascript',
'.jsx': 'javascript',
'.ts': 'typescript',
'.tsx': 'typescript',
'.java': 'java',
'.cpp': 'cpp',
'.cc': 'cpp',
'.hpp': 'cpp',
'.c': 'c',
'.h': 'c',
'.cs': 'csharp',
'.rb': 'ruby',
'.php': 'php',
'.go': 'go',
'.rs': 'rust',
'.swift': 'swift',
'.kt': 'kotlin',
'.scala': 'scala',
'.m': 'objective-c',
'.mm': 'objective-c',
# Web Technologies
'.html': 'html',
'.htm': 'html',
'.css': 'css',
'.scss': 'scss',
'.sass': 'scss',
'.less': 'less',
'.vue': 'vue',
'.svelte': 'svelte',
# Data & Config Files
'.json': 'json',
'.xml': 'xml',
'.yaml': 'yaml',
'.yml': 'yaml',
'.toml': 'toml',
'.ini': 'ini',
'.conf': 'config',
# Documentation
'.md': 'markdown',
'.markdown': 'markdown',
'.rst': 'restructuredtext',
'.tex': 'latex',
# Shell Scripts
'.sh': 'shell',
'.bash': 'shell',
'.zsh': 'shell',
'.fish': 'shell',
'.bat': 'batch',
'.cmd': 'batch',
'.ps1': 'powershell',
# Other Common Types
'.sql': 'sql',
'.r': 'r',
'.gradle': 'gradle',
'.dockerfile': 'dockerfile',
'.env': 'env',
'.gitignore': 'gitignore'
}
# Handle files without extension but specific names
if not ext:
filename = Path(file_path).name.lower()
name_map = {
'dockerfile': 'dockerfile',
'makefile': 'makefile',
'jenkinsfile': 'jenkinsfile',
'vagrantfile': 'ruby',
'.env': 'env',
'.gitignore': 'gitignore'
}
return name_map.get(filename, 'text')
return language_map.get(ext, 'text')
def read_file(self, file_path: str) -> Dict[str, Union[List[Dict[str, str]], bool]]:
"""Read and format file contents for LLM consumption."""
try:
full_path = self.repo_path / file_path
# Check if path is safe
if not full_path.resolve().is_relative_to(self.repo_path.resolve()):
return {
"content": [{
"type": "text",
"text": f"Error: Attempted to access file outside repository: {file_path}"
}],
"isError": True
}
# Check if file exists
if not full_path.exists():
return {
"content": [{
"type": "text",
"text": f"File {file_path} not found"
}],
"isError": True
}
# Check if it's a symbolic link
if full_path.is_symlink():
return {
"content": [{
"type": "text",
"text": f"Error: Symbolic links are not supported: {file_path}"
}],
"isError": True
}
# Get file stats
stats = full_path.stat()
# Check file size
if stats.st_size > self.MAX_SIZE:
return {
"content": [{
"type": "text",
"text": f"File {file_path} is too large ({stats.st_size} bytes). "
f"Maximum size is {self.MAX_SIZE} bytes."
}],
"isError": True
}
# Read file content with line limit
lines = []
line_count = 0
truncated = False
with open(full_path, 'r', encoding='utf-8') as f:
for line in f:
line_count += 1
if line_count <= self.MAX_LINES:
lines.append(line.rstrip('\n'))
else:
truncated = True
break
content = '\n'.join(lines)
if truncated:
content += f"\n\n[File truncated after {self.MAX_LINES} lines]"
# Detect language
language = self._detect_language(file_path)
return {
"content": [{
"type": "text",
"text": f"File: {file_path}\n"
f"Language: {language}\n"
f"Size: {stats.st_size} bytes\n"
f"Total lines: {line_count}\n\n"
f"{content}"
}]
}
except UnicodeDecodeError:
return {
"content": [{
"type": "text",
"text": f"Error: File {file_path} appears to be a binary file"
}],
"isError": True
}
except Exception as error:
return {
"content": [{
"type": "text",
"text": f"Error reading file: {str(error)}"
}],
"isError": True
}
class CodeAnalysisServer(FastMCP):
def __init__(self, name: str):
# super().__init__(name)
# First, call the parent class constructor with capabilities
super().__init__(
name,
capabilities={
"prompts": {} # Enable prompts capability
}
)
self.repo_path: Optional[Path] = None
self.analyzer: Optional[RepoStructureAnalyzer] = None
self.file_reader: Optional[FileReader] = None
# Add prompts capability
# self.capabilities["prompts"] = {}
def initialize_repo(self, path: str) -> None:
"""Initialize the repository path and analysis tools."""
if not path or path in (".", "./"):
raise ValueError("Repository path must be an absolute path")
repo_path = Path(path).resolve()
if not repo_path.is_absolute():
raise ValueError(f"Repository path must be absolute, got: {repo_path}")
if not repo_path.exists():
raise ValueError(f"Repository path does not exist: {repo_path}")
if not repo_path.is_dir():
raise ValueError(f"Repository path is not a directory: {repo_path}")
self.repo_path = repo_path
self.analyzer = RepoStructureAnalyzer(self.repo_path)
self.file_reader = FileReader(self.repo_path)
# Initialize server
mcp = CodeAnalysisServer("code-analysis")
@mcp.tool()
async def initialize_repository(path: str) -> str:
"""Initialize the repository path for future code analysis operations.
Args:
path: Path to the repository root directory that contains the code to analyze
"""
try:
mcp.initialize_repo(path)
gitignore_path = Path(path) / '.gitignore'
gitignore_status = "Found .gitignore file" if gitignore_path.exists() else "No .gitignore file present"
return f"Successfully initialized code repository at: {mcp.repo_path}\n{gitignore_status}"
except ValueError as e:
return f"Error initializing code repository: {str(e)}"
@mcp.tool()
async def get_repo_info() -> str:
"""Get information about the currently initialized code repository."""
if not mcp.repo_path:
return "No code repository has been initialized yet. Please use initialize_repository first."
gitignore_path = mcp.repo_path / '.gitignore'
gitignore_status = "Found .gitignore file" if gitignore_path.exists() else "No .gitignore file present"
return f"""Code Repository Information:
Path: {mcp.repo_path}
Exists: {mcp.repo_path.exists()}
Is Directory: {mcp.repo_path.is_dir()}
{gitignore_status}"""
@mcp.tool()
async def get_repo_structure(sub_path: Optional[str] = None, depth: Optional[int] = None) -> str:
"""Get the structure of files and directories in the repository.
Args:
sub_path: Optional subdirectory path relative to repository root
depth: Optional maximum depth to traverse (default is 3)
"""
if not mcp.repo_path or not mcp.analyzer:
return "No code repository has been initialized yet. Please use initialize_repository first."
try:
target_path = mcp.repo_path
if sub_path:
target_path = mcp.repo_path / sub_path
if not mcp.analyzer._is_safe_path(target_path):
return "Error: Invalid path - directory traversal not allowed"
structure = mcp.analyzer.get_structure(
target_path,
sub_path or '',
max_depth=depth
)
return mcp.analyzer.format_structure(structure)
except Exception as e:
return f"Error analyzing repository structure: {str(e)}"
@mcp.tool()
async def read_file(file_path: str) -> str:
"""Read and display the contents of a file from the repository.
Args:
file_path: Path to the file relative to repository root
"""
if not mcp.repo_path or not mcp.file_reader or not mcp.analyzer:
return "No code repository has been initialized yet. Please use initialize_repository first."
try:
# Check if file should be ignored based on gitignore patterns
if mcp.analyzer.should_ignore(file_path):
return f"File {file_path} is ignored based on .gitignore patterns"
result = mcp.file_reader.read_file(file_path)
if result.get("isError", False):
return result["content"][0]["text"]
return result["content"][0]["text"]
except Exception as e:
return f"Error reading file: {str(e)}"
@mcp.prompt()
def analyze_code_repository(codebase_path: str) -> list[UserMessage | AssistantMessage]:
"""Analyze a code repository at the specified path.
Args:
codebase_path: Absolute path to the code repository
"""
return [
UserMessage(f"""You are an AI assistant specialized in codebase analysis, operating as part of an MCP server named code-analysis. Your task is to analyze codebases and answer user questions about them using a set of specialized tools.
The codebase we are going to analyze is located at {codebase_path}
The user will ask specific questions about this codebase. To answer the user's questions, follow these steps:
1. Initialize Repository:
- Use the `initialize_repository(path: str) -> str` tool with the full path to the repository root directory.
- This step is required before using any other tools.
2. Verify Initialization:
- Use the `get_repo_info() -> str` tool to confirm successful initialization.
- This will show the path, existence verification, and .gitignore status.
3. Get Repository Structure:
- Use the `get_repo_structure(sub_path?: str, depth?: int) -> str` tool to generate a tree view of the repository's file structure.
- Start with the default depth for an overview, then use sub_path to explore specific directories of interest.
- Increase depth only for detailed investigation of specific areas.
4. Read Files:
- Use the `read_file(file_path: str) -> str` tool to read and display file contents with syntax recognition.
- This tool is limited to files under 1MB and 1000 lines.
- Start with README files and other documentation to gain initial context.
5. Systematic Investigation:
- Generate an initial hypothesis about the system based on the repository structure and documentation.
- Use the tools strategically to explore the codebase, focusing on areas relevant to the user's question.
- Continuously update your understanding as you gather more information.
- Use the `memory` and/or `sequential-thinking` MCP servers if available
6. Evidence-Based Analysis:
- Support all claims with concrete evidence from the codebase.
- Clearly distinguish between directly verified code, inferred patterns, and areas requiring further investigation.
7. Comprehensive Analysis Presentation:
Present your findings in the following format:
a. Initial System Hypothesis
b. Investigation Methodology
c. Discovered System Characteristics
d. Supporting Evidence
e. Remaining Uncertainties
f. Final Answer to User's Question
Throughout your analysis, document your thought process inside <investigation_log> tags. For each step:
- State the current focus or question you're addressing.
- List the potential tools you could use and explain your choice.
- Document the results of each tool use, quoting relevant code snippets or file contents.
- Explain your reasoning when forming hypotheses or drawing conclusions.
- Summarize your findings periodically throughout the investigation.
Be sure to use the available tools appropriately and document any limitations or errors encountered. It's okay for this section to be quite long.
Remember:
- You are operating in the context of an MCP server named code-analysis.
- Always use the tools provided and do not assume access to any other capabilities.
- If you encounter any errors or limitations with the tools, clearly state them in your analysis.
- Maintain a systematic and evidence-based approach throughout your investigation.
Now, you are ready to begin your analysis of the codebase. Please do the necessary steps to initialize.
Let me know when you are ready and I will provide the question I want to investigate.
"""),
# AssistantMessage("""I'll help you analyze this codebase. First, let me initialize the repository to get started. I will then present
# my initial insights and ask you the question that you want to investigate.
# """)
]
if __name__ == "__main__":
# Initialize and run the server
mcp.run(transport='stdio')