MCP Code Analyzer
by emiryasar
- mcp_code_analyzer
- tools
import logging
from pathlib import Path
from typing import Dict, Any, List, Optional
from datetime import datetime
import re
from dataclasses import dataclass
import fnmatch
from .base import BaseTool
logger = logging.getLogger(__name__)
@dataclass
class SearchResult:
"""Container for search results"""
path: str
match_type: str # file, content, pattern
line_number: Optional[int] = None
content: Optional[str] = None
context: Optional[Dict[str, Any]] = None
class PathFinder(BaseTool):
"""Advanced file and directory search tool"""
async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
search_path = arguments.get('path', '.')
operation = arguments.get('operation', 'find')
operations = {
'find': self._find_files,
'glob': self._glob_search,
'pattern': self._pattern_search,
'recent': self._find_recent
}
if operation not in operations:
return {"error": f"Unknown operation: {operation}"}
try:
result = await operations[operation](Path(search_path), arguments)
return {"success": True, "data": result}
except Exception as e:
logger.error(f"PathFinder operation failed: {e}")
return {"success": False, "error": str(e)}
async def _find_files(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]:
"""Find files based on criteria"""
filters = args.get('filters', {})
max_depth = args.get('max_depth', None)
exclude_patterns = set(args.get('exclude', []))
results = []
total_scanned = 0
try:
for root, dirs, files in self._walk_with_depth(path, max_depth):
# Apply directory exclusions
dirs[:] = [d for d in dirs if not any(
fnmatch.fnmatch(d, pattern) for pattern in exclude_patterns
)]
for file in files:
total_scanned += 1
file_path = Path(root) / file
if self._should_skip(file_path):
continue
if self._matches_filters(file_path, filters):
stat = file_path.stat()
results.append({
"path": str(file_path),
"name": file_path.name,
"extension": file_path.suffix,
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"created": datetime.fromtimestamp(stat.st_ctime).isoformat()
})
return {
"results": results,
"summary": {
"total_found": len(results),
"total_scanned": total_scanned,
"search_path": str(path)
}
}
except Exception as e:
raise RuntimeError(f"File search failed: {e}")
async def _glob_search(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]:
"""Search using glob patterns"""
patterns = args.get('patterns', ['*'])
recursive = args.get('recursive', True)
results = []
total_matches = 0
try:
for pattern in patterns:
if recursive:
matches = path.rglob(pattern)
else:
matches = path.glob(pattern)
for match in matches:
if self._should_skip(match):
continue
stat = match.stat()
results.append({
"path": str(match),
"pattern": pattern,
"type": "directory" if match.is_dir() else "file",
"size": stat.st_size if match.is_file() else None,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat()
})
total_matches += 1
return {
"results": results,
"summary": {
"patterns": patterns,
"total_matches": total_matches,
"search_path": str(path)
}
}
except Exception as e:
raise RuntimeError(f"Glob search failed: {e}")
async def _pattern_search(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]:
"""Search for files matching complex patterns"""
pattern_rules = args.get('rules', {})
max_results = args.get('max_results', None)
results = []
try:
for file_path in self._recursive_search(path):
if self._should_skip(file_path):
continue
if self._matches_pattern_rules(file_path, pattern_rules):
stat = file_path.stat()
results.append({
"path": str(file_path),
"name": file_path.name,
"matches": self._get_matching_rules(file_path, pattern_rules),
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat()
})
if max_results and len(results) >= max_results:
break
return {
"results": results,
"summary": {
"total_matches": len(results),
"rules_applied": list(pattern_rules.keys()),
"search_path": str(path)
}
}
except Exception as e:
raise RuntimeError(f"Pattern search failed: {e}")
async def _find_recent(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]:
"""Find recently modified files"""
hours = args.get('hours', 24)
file_types = set(args.get('file_types', []))
min_size = args.get('min_size', 0)
max_size = args.get('max_size', float('inf'))
results = []
cutoff_time = datetime.now().timestamp() - (hours * 3600)
try:
for file_path in self._recursive_search(path):
if self._should_skip(file_path):
continue
stat = file_path.stat()
if stat.st_mtime >= cutoff_time:
if not file_types or file_path.suffix in file_types:
if min_size <= stat.st_size <= max_size:
results.append({
"path": str(file_path),
"name": file_path.name,
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"hours_ago": (datetime.now().timestamp() - stat.st_mtime) / 3600
})
# Sort by modification time
results.sort(key=lambda x: x["modified"], reverse=True)
return {
"results": results,
"summary": {
"total_found": len(results),
"time_range_hours": hours,
"search_path": str(path)
}
}
except Exception as e:
raise RuntimeError(f"Recent files search failed: {e}")
def _walk_with_depth(self, path: Path, max_depth: Optional[int] = None):
"""Walk directory tree with optional depth limit"""
base_depth = len(path.parents)
for root, dirs, files in path.walk():
current_depth = len(Path(root).parents) - base_depth
if max_depth is not None and current_depth > max_depth:
dirs.clear()
else:
yield root, dirs, files
def _matches_filters(self, path: Path, filters: Dict[str, Any]) -> bool:
"""Check if file matches all filters"""
try:
stat = path.stat()
for key, value in filters.items():
if key == 'extension' and path.suffix != value:
return False
elif key == 'name' and path.name != value:
return False
elif key == 'min_size' and stat.st_size < value:
return False
elif key == 'max_size' and stat.st_size > value:
return False
elif key == 'modified_after' and stat.st_mtime < value:
return False
elif key == 'modified_before' and stat.st_mtime > value:
return False
return True
except Exception:
return False
def _matches_pattern_rules(self, path: Path, rules: Dict[str, Any]) -> bool:
"""Check if file matches pattern rules"""
try:
for rule_type, pattern in rules.items():
if rule_type == 'name_pattern':
if not fnmatch.fnmatch(path.name, pattern):
return False
elif rule_type == 'path_pattern':
if not fnmatch.fnmatch(str(path), pattern):
return False
elif rule_type == 'regex':
if not re.search(pattern, str(path)):
return False
return True
except Exception:
return False
def _get_matching_rules(self, path: Path, rules: Dict[str, Any]) -> List[str]:
"""Get list of matching rules for a file"""
matches = []
for rule_type, pattern in rules.items():
if rule_type == 'name_pattern' and fnmatch.fnmatch(path.name, pattern):
matches.append(rule_type)
elif rule_type == 'path_pattern' and fnmatch.fnmatch(str(path), pattern):
matches.append(rule_type)
elif rule_type == 'regex' and re.search(pattern, str(path)):
matches.append(rule_type)
return matches
def _recursive_search(self, path: Path) -> List[Path]:
"""Recursively search directory"""
try:
return list(path.rglob('*'))
except Exception:
return []
class ContentScanner(BaseTool):
"""Advanced content search and analysis tool"""
def __init__(self):
super().__init__()
self._file_cache = {}
self.max_workers = 4
async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
operation = arguments.get('operation', 'search')
target_path = arguments.get('path', '.')
operations = {
'search': self._search_content,
'analyze': self._analyze_content,
'regex': self._regex_search,
'similar': self._find_similar
}
if operation not in operations:
return {"error": f"Unknown operation: {operation}"}
try:
result = await operations[operation](Path(target_path), arguments)
return {"success": True, "data": result}
except Exception as e:
logger.error(f"ContentScanner operation failed: {e}")
return {"success": False, "error": str(e)}
async def _search_content(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]:
"""Search file contents for text"""
search_text = args.get('text')
case_sensitive = args.get('case_sensitive', False)
whole_word = args.get('whole_word', False)
file_pattern = args.get('file_pattern', '*')
if not search_text:
return {"error": "Search text is required"}
results = []
total_files = 0
matches_found = 0
try:
# Prepare search pattern
if whole_word:
pattern = r'\b' + re.escape(search_text) + r'\b'
else:
pattern = re.escape(search_text)
flags = 0 if case_sensitive else re.IGNORECASE
regex = re.compile(pattern, flags)
# Search files
for file_path in path.rglob(file_pattern):
if self._should_skip(file_path) or not file_path.is_file():
continue
total_files += 1
try:
matches = await self._find_matches(file_path, regex)
if matches:
matches_found += len(matches)
results.append({
"file": str(file_path),
"matches": matches
})
except Exception as e:
logger.error(f"Error searching {file_path}: {e}")
return {
"results": results,
"summary": {
"total_files_searched": total_files,
"files_with_matches": len(results),
"total_matches": matches_found,
"search_pattern": {
"text": search_text,
"case_sensitive": case_sensitive,
"whole_word": whole_word
}
}
}
except Exception as e:
raise RuntimeError(f"Content search failed: {e}")
async def _analyze_content(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze file contents"""
file_pattern = args.get('file_pattern', '*')
analysis_types = set(args.get('types', ['duplicates', 'statistics', 'patterns']))
try:
analysis_results = {
"files_analyzed": 0,
"total_size": 0,
"analysis": {}
}
if 'duplicates' in analysis_types:
analysis_results["analysis"]["duplicates"] = await self._find_duplicate_content(path, file_pattern)
if 'statistics' in analysis_types:
analysis_results["analysis"]["statistics"] = await self._generate_content_statistics(path, file_pattern)
if 'patterns' in analysis_types:
analysis_results["analysis"]["patterns"] = await self._analyze_content_patterns(path, file_pattern)
return analysis_results
except Exception as e:
raise RuntimeError(f"Content analysis failed: {e}")
async def _regex_search(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]:
"""Search using regular expressions"""
pattern = args.get('pattern')
file_pattern = args.get('file_pattern', '*')
multiline = args.get('multiline', False)
if not pattern:
return {"error": "Regex pattern is required"}
try:
flags = re.MULTILINE if multiline else 0
regex = re.compile(pattern, flags)
results = []
for file_path in path.rglob(file_pattern):
if self._should_skip(file_path) or not file_path.is_file():
continue
matches = await self._find_matches(file_path, regex)
if matches:
results.append({
"file": str(file_path),
"matches": matches
})
return {
"results": results,
"summary": {
"total_files_searched": len(list(path.rglob(file_pattern))),
"files_with_matches": len(results),
"pattern": pattern,
"multiline": multiline
}
}
except Exception as e:
raise RuntimeError(f"Regex search failed: {e}")
async def _find_similar(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]:
"""Find files with similar content"""
threshold = args.get('similarity_threshold', 0.8)
file_pattern = args.get('file_pattern', '*')
min_size = args.get('min_size', 0)
try:
file_groups = []
content_hashes = {}
# First pass: collect file contents
for file_path in path.rglob(file_pattern):
if self._should_skip(file_path) or not file_path.is_file():
continue
if file_path.stat().st_size < min_size:
continue
try:
content = await self._read_file_content(file_path)
if content:
content_hashes[str(file_path)] = self._calculate_similarity_hash(content)
except Exception as e:
logger.error(f"Error reading {file_path}: {e}")
# Second pass: compare files
analyzed_files = set()
for file1, hash1 in content_hashes.items():
if file1 in analyzed_files:
continue
similar_files = []
for file2, hash2 in content_hashes.items():
if file1 != file2 and file2 not in analyzed_files:
similarity = self._calculate_hash_similarity(hash1, hash2)
if similarity >= threshold:
similar_files.append({
"path": file2,
"similarity": similarity
})
analyzed_files.add(file2)
if similar_files:
analyzed_files.add(file1)
file_groups.append({
"base_file": file1,
"similar_files": similar_files
})
return {
"groups": file_groups,
"summary": {
"total_files": len(content_hashes),
"similarity_groups": len(file_groups),
"threshold": threshold
}
}
except Exception as e:
raise RuntimeError(f"Similarity analysis failed: {e}")
async def _find_matches(self, file_path: Path, pattern: re.Pattern) -> List[Dict[str, Any]]:
"""Find pattern matches in file"""
matches = []
try:
content = await self._read_file_content(file_path)
if not content:
return matches
for i, line in enumerate(content.splitlines(), 1):
for match in pattern.finditer(line):
matches.append({
"line": i,
"start": match.start(),
"end": match.end(),
"text": match.group(),
"context": self._get_line_context(content.splitlines(), i)
})
except Exception as e:
logger.error(f"Error finding matches in {file_path}: {e}")
return matches
async def _find_duplicate_content(self, path: Path, pattern: str) -> Dict[str, Any]:
"""Find duplicate content across files"""
content_map = {}
duplicates = []
try:
for file_path in path.rglob(pattern):
if self._should_skip(file_path) or not file_path.is_file():
continue
content = await self._read_file_content(file_path)
if not content:
continue
content_hash = self._calculate_content_hash(content)
if content_hash in content_map:
# Found a duplicate
if content_map[content_hash] not in duplicates:
duplicates.append({
"original": content_map[content_hash],
"duplicates": []
})
for group in duplicates:
if group["original"] == content_map[content_hash]:
group["duplicates"].append(str(file_path))
else:
content_map[content_hash] = str(file_path)
return {
"duplicate_groups": duplicates,
"total_duplicates": sum(len(group["duplicates"]) for group in duplicates)
}
except Exception as e:
logger.error(f"Error finding duplicates: {e}")
return {"error": str(e)}
async def _generate_content_statistics(self, path: Path, pattern: str) -> Dict[str, Any]:
"""Generate statistics about file contents"""
stats = {
"total_files": 0,
"total_lines": 0,
"total_size": 0,
"average_line_length": 0,
"file_types": {},
"encoding_types": {},
"line_endings": {
"unix": 0,
"windows": 0,
"mixed": 0
}
}
try:
line_lengths = []
for file_path in path.rglob(pattern):
if self._should_skip(file_path) or not file_path.is_file():
continue
stats["total_files"] += 1
stats["total_size"] += file_path.stat().st_size
# Track file types
ext = file_path.suffix
stats["file_types"][ext] = stats["file_types"].get(ext, 0) + 1
content = await self._read_file_content(file_path)
if not content:
continue
lines = content.splitlines()
stats["total_lines"] += len(lines)
line_lengths.extend(len(line) for line in lines)
# Detect line endings
if '\r\n' in content and '\n' in content.replace('\r\n', ''):
stats["line_endings"]["mixed"] += 1
elif '\r\n' in content:
stats["line_endings"]["windows"] += 1
else:
stats["line_endings"]["unix"] += 1
# Track encoding
encoding = self._detect_encoding(file_path)
stats["encoding_types"][encoding] = stats["encoding_types"].get(encoding, 0) + 1
if line_lengths:
stats["average_line_length"] = sum(line_lengths) / len(line_lengths)
return stats
except Exception as e:
logger.error(f"Error generating statistics: {e}")
return {"error": str(e)}
async def _analyze_content_patterns(self, path: Path, pattern: str) -> Dict[str, Any]:
"""Analyze content for common patterns"""
patterns = {
"common_words": {},
"line_patterns": [],
"structure_patterns": []
}
try:
word_freq = {}
line_patterns = set()
for file_path in path.rglob(pattern):
if self._should_skip(file_path) or not file_path.is_file():
continue
content = await self._read_file_content(file_path)
if not content:
continue
# Analyze words
words = re.findall(r'\w+', content.lower())
for word in words:
word_freq[word] = word_freq.get(word, 0) + 1
# Analyze line patterns
lines = content.splitlines()
for line in lines:
# Find repeating patterns
pattern_match = re.match(r'^(\s*)(.+?)(\s*)$', line)
if pattern_match:
indent, content, trailing = pattern_match.groups()
if len(indent) > 0:
line_patterns.add(f"indent:{len(indent)}")
# Analyze structure patterns
if file_path.suffix == '.py':
await self._analyze_python_patterns(content, patterns)
# Process word frequencies
patterns["common_words"] = dict(sorted(
word_freq.items(),
key=lambda x: x[1],
reverse=True
)[:100])
patterns["line_patterns"] = list(line_patterns)
return patterns
except Exception as e:
logger.error(f"Error analyzing patterns: {e}")
return {"error": str(e)}
async def _read_file_content(self, path: Path) -> Optional[str]:
"""Read file content with caching"""
if str(path) in self._file_cache:
return self._file_cache[str(path)]
try:
content = path.read_text(encoding=self._detect_encoding(path))
self._file_cache[str(path)] = content
return content
except Exception as e:
logger.error(f"Error reading {path}: {e}")
return None
def _detect_encoding(self, path: Path) -> str:
"""Detect file encoding"""
try:
import chardet
with open(path, 'rb') as f:
raw = f.read()
result = chardet.detect(raw)
return result['encoding'] or 'utf-8'
except Exception:
return 'utf-8'
def _calculate_content_hash(self, content: str) -> str:
"""Calculate hash of content"""
import hashlib
return hashlib.md5(content.encode()).hexdigest()
def _calculate_similarity_hash(self, content: str) -> List[int]:
"""Calculate similarity hash for content"""
# Simplified implementation of similarity hashing
words = content.split()
return [hash(word) for word in words]
def _calculate_hash_similarity(self, hash1: List[int], hash2: List[int]) -> float:
"""Calculate similarity between two hashes"""
common = set(hash1) & set(hash2)
return len(common) / max(len(hash1), len(hash2))
def _get_line_context(self, lines: List[str], line_number: int, context_lines: int = 2) -> Dict[str, List[str]]:
"""Get context lines around a match"""
start = max(0, line_number - context_lines - 1)
end = min(len(lines), line_number + context_lines)
return {
"before": lines[start:line_number-1],
"after": lines[line_number:end]
}
async def _analyze_python_patterns(self, content: str, patterns: Dict[str, Any]) -> None:
"""Analyze Python-specific patterns"""
import ast
try:
tree = ast.parse(content)
# Analyze structure patterns
class_patterns = []
function_patterns = []
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
methods = len([n for n in node.body if isinstance(n, ast.FunctionDef)])
class_patterns.append(f"class_with_{methods}_methods")
elif isinstance(node, ast.FunctionDef):
args = len(node.args.args)
function_patterns.append(f"function_with_{args}_args")
if class_patterns:
patterns["structure_patterns"].extend(class_patterns)
if function_patterns:
patterns["structure_patterns"].extend(function_patterns)
except Exception as e:
logger.error(f"Error analyzing Python patterns: {e}")