MCP Code Analyzer
by emiryasar
- mcp_code_analyzer
- tools
import json
import os
from idlelib.iomenu import encoding
from pathlib import Path
from typing import Dict, Any, Optional, List, Set, Union, Tuple
from datetime import datetime
import logging
import shutil
import ast
import re
from dataclasses import dataclass
import tempfile
import hashlib
from enum import Enum, auto
logger = logging.getLogger(__name__)
class ChangeType(Enum):
"""Types of code changes"""
MODIFY = auto()
INSERT = auto()
DELETE = auto()
REFACTOR = auto()
class AnalysisType(Enum):
"""Types of code analysis"""
SYNTAX = auto()
IMPORTS = auto()
FUNCTIONS = auto()
CLASSES = auto()
VARIABLES = auto()
DEPENDENCIES = auto()
@dataclass
class AffectedCode:
"""Information about affected code"""
file_path: str
line_range: Dict[str, int]
change_type: str
reason: str
suggested_action: str
severity: str # 'high', 'medium', 'low'
reference_type: str # 'import', 'function', 'class', 'variable'
original_code: Optional[str] = None
@dataclass
class CodeChange:
"""Details of a code change"""
file_path: str
change_type: ChangeType
section: Dict[str, int]
original_content: str
new_content: str
affected_code: List[AffectedCode]
metadata: Dict[str, Any]
hash: str = ""
def __post_init__(self):
self.hash = hashlib.sha256(
f"{self.file_path}:{self.section}:{self.original_content}:{self.new_content}"
.encode()
).hexdigest()[:12]
@dataclass
class ModificationResult:
"""Result of code modification operation"""
success: bool
change: Optional[CodeChange] = None
backup_path: Optional[str] = None
error: Optional[str] = None
validation_errors: List[str] = None
affected_code: List[AffectedCode] = None
details: Optional[Dict[str, Any]] = None
@dataclass
class ValidationResult:
"""Result of code validation"""
valid: bool
errors: List[str]
warnings: List[str]
details: Dict[str, Any]
class ModificationError(Exception):
"""Base error for code modifications"""
def __init__(self, message: str, backup_path: Optional[str] = None):
self.message = message
self.backup_path = backup_path
super().__init__(self.message)
class ValidationError(ModificationError):
"""Error during code validation"""
pass
class BackupError(ModificationError):
"""Error during backup operations"""
pass
class CodeModifier:
"""Advanced code modification tool with impact analysis"""
def __init__(self, base_path: Optional[Union[str, Path]] = None):
self._base_path = Path(base_path) if base_path else Path.cwd()
self._backup_dir = self._base_path / "backups" / datetime.now().strftime('%Y%m%d')
self._temp_dir = self._base_path / "temp"
self._change_history: List[CodeChange] = []
self._affected_files: Set[str] = set()
self._cached_asts: Dict[str, ast.AST] = {}
self._setup_directories()
def _setup_directories(self) -> None:
"""Setup required directories"""
try:
self._backup_dir.mkdir(parents=True, exist_ok=True)
self._temp_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
logger.error(f"Failed to create directories: {e}")
raise ModificationError("Failed to initialize directory structure")
async def modify_code(
self,
file_path: str,
section: Dict[str, int],
new_content: str,
description: Optional[str] = None,
validate: bool = True
) -> ModificationResult:
"""Modify code with comprehensive analysis and validation"""
try:
path = Path(file_path)
if not self._validate_file(path):
raise ValidationError("Invalid file path or file does not exist")
# Create backup before any modification
backup_path = await self._create_backup(path)
# Read and validate file content
original_content = self._read_file_content(path)
if not original_content:
raise ValidationError("Failed to read file content")
lines = original_content.splitlines(keepends=True)
start_line = section.get('start', 0)
end_line = section.get('end', len(lines))
if not self._validate_range(start_line, end_line, len(lines)):
raise ValidationError("Invalid line range")
# Extract original section content
original_section = ''.join(lines[start_line:end_line])
# Validate changes if requested
if validate:
validation = self._validate_modification(
path, original_section, new_content
)
if not validation.valid:
return ModificationResult(
success=False,
error="Validation failed",
validation_errors=validation.errors
)
# Find affected code
affected_code = await self._analyze_impact(
path, original_section, new_content, section
)
# Prepare modification
new_lines = lines.copy()
new_content_lines = new_content.splitlines(keepends=True)
new_lines[start_line:end_line] = new_content_lines
# Create temporary file for modification
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp:
tmp.writelines(new_lines)
temp_path = Path(tmp.name)
# Validate syntax if it's a Python file
if path.suffix == '.py':
try:
with open(temp_path, 'r') as f:
compile(f.read(), temp_path, 'exec')
except SyntaxError as e:
temp_path.unlink()
raise ValidationError(f"Syntax error in modified code: {str(e)}")
# Apply changes
shutil.move(temp_path, path)
# Create change record
change = CodeChange(
file_path=str(path),
change_type=ChangeType.MODIFY,
section=section,
original_content=original_section,
new_content=new_content,
affected_code=affected_code,
metadata={
'timestamp': datetime.now().isoformat(),
'backup_path': str(backup_path),
'description': description
}
)
# Update history
self._change_history.append(change)
self._affected_files.update(f['file_path'] for f in affected_code)
return ModificationResult(
success=True,
change=change,
backup_path=str(backup_path),
affected_code=affected_code,
details={
'modified_lines': end_line - start_line,
'affected_files': len(self._affected_files),
'change_hash': change.hash
}
)
except (ValidationError, ModificationError) as e:
logger.error(f"Modification error: {e.message}")
return ModificationResult(
success=False,
error=e.message,
backup_path=getattr(e, 'backup_path', None)
)
except Exception as e:
logger.error(f"Unexpected error during modification: {e}")
if 'backup_path' in locals():
try:
self._restore_backup(backup_path, path)
return ModificationResult(
success=False,
error=f"Error: {str(e)}, backup restored",
backup_path=str(backup_path)
)
except Exception as restore_error:
logger.error(f"Failed to restore backup: {restore_error}")
return ModificationResult(success=False, error=str(e))
async def insert_code(
self,
file_path: str,
line_number: int,
content: str,
description: Optional[str] = None,
validate: bool = True
) -> ModificationResult:
"""Insert code at specific line"""
return await self.modify_code(
file_path,
{"start": line_number, "end": line_number},
content,
description=description,
validate=validate
)
async def delete_code(
self,
file_path: str,
start_line: int,
end_line: int,
description: Optional[str] = None,
validate: bool = True
) -> ModificationResult:
"""Delete code section with impact analysis"""
return await self.modify_code(
file_path,
{"start": start_line, "end": end_line},
"",
description=description,
validate=validate
)
async def _analyze_impact(
self,
file_path: Path,
original_content: str,
new_content: str,
section: Dict[str, int]
) -> List[AffectedCode]:
"""Analyze impact of code changes"""
affected_code = []
# Analyze different aspects
for analysis_type in AnalysisType:
try:
affected = await self._analyze_specific_impact(
analysis_type,
file_path,
original_content,
new_content,
section
)
affected_code.extend(affected)
except Exception as e:
logger.error(f"Error in {analysis_type} analysis: {e}")
return affected_code
async def _analyze_specific_impact(
self,
analysis_type: AnalysisType,
file_path: Path,
original_content: str,
new_content: str,
section: Dict[str, int]
) -> List[AffectedCode]:
"""Analyze specific type of impact"""
affected = []
if analysis_type == AnalysisType.SYNTAX:
# Syntax changes might affect the entire file
syntax_changes = self._analyze_syntax_changes(
original_content,
new_content
)
if syntax_changes:
affected.append(
AffectedCode(
file_path=str(file_path),
line_range=section,
change_type="syntax",
reason="Syntax structure changed",
suggested_action="Review syntax changes",
severity="high",
reference_type="syntax",
original_code=original_content
)
)
elif analysis_type == AnalysisType.IMPORTS:
# Changes in imports affect dependent files
import_changes = self._analyze_import_changes(
original_content,
new_content
)
for change in import_changes:
affected.extend(await self._find_import_dependents(
file_path, change
))
elif analysis_type == AnalysisType.FUNCTIONS:
# Function signature changes affect callers
func_changes = self._analyze_function_changes(
original_content,
new_content
)
for change in func_changes:
affected.extend(await self._find_function_callers(
file_path, change
))
elif analysis_type == AnalysisType.CLASSES:
# Class changes affect subclasses and usage
class_changes = self._analyze_class_changes(
original_content,
new_content
)
for change in class_changes:
affected.extend(await self._find_class_dependents(
file_path, change
))
elif analysis_type == AnalysisType.VARIABLES:
# Variable changes affect their usage
var_changes = self._analyze_variable_changes(
original_content,
new_content
)
for change in var_changes:
affected.extend(await self._find_variable_usage(
file_path, change
))
return affected
def _analyze_syntax_changes(self, original: str, modified: str) -> List[Dict[str, Any]]:
"""Analyze syntax structure changes"""
changes = []
try:
original_ast = ast.parse(original)
modified_ast = ast.parse(modified)
# Compare AST structures
original_nodes = set(type(node).__name__ for node in ast.walk(original_ast))
modified_nodes = set(type(node).__name__ for node in ast.walk(modified_ast))
if original_nodes != modified_nodes:
changes.append({
'type': 'structure',
'added': modified_nodes - original_nodes,
'removed': original_nodes - modified_nodes
})
except Exception as e:
logger.error(f"Syntax analysis error: {e}")
return changes
def _analyze_import_changes(self, original: str, modified: str) -> List[Dict[str, Any]]:
"""Analyze changes in imports"""
changes = []
try:
original_ast = ast.parse(original)
modified_ast = ast.parse(modified)
def get_imports(tree: ast.AST) -> Set[str]:
imports = set()
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for name in node.names:
imports.add(name.name)
elif isinstance(node, ast.ImportFrom):
if node.module:
for name in node.names:
imports.add(f"{node.module}.{name.name}")
return imports
original_imports = get_imports(original_ast)
modified_imports = get_imports(modified_ast)
if original_imports != modified_imports:
changes.append({
'type': 'imports',
'added': modified_imports - original_imports,
'removed': original_imports - modified_imports
})
except Exception as e:
logger.error(f"Import analysis error: {e}")
return changes
def _analyze_function_changes(self, original: str, modified: str) -> List[Dict[str, Any]]:
"""Analyze function signature changes"""
changes = []
try:
original_ast = ast.parse(original)
modified_ast = ast.parse(modified)
def get_function_info(node: ast.FunctionDef) -> Dict[str, Any]:
return {
'name': node.name,
'args': len(node.args.args),
'defaults': len(node.args.defaults),
'kwonly': len(node.args.kwonlyargs),
'vararg': bool(node.args.vararg),
'kwarg': bool(node.args.kwarg)
}
original_funcs = {
node.name: get_function_info(node)
for node in ast.walk(original_ast)
if isinstance(node, ast.FunctionDef)
}
modified_funcs = {
node.name: get_function_info(node)
for node in ast.walk(modified_ast)
if isinstance(node, ast.FunctionDef)
}
# Find changed functions
for name, info in modified_funcs.items():
if name in original_funcs and original_funcs[name] != info:
changes.append({
'type': 'function',
'name': name,
'original': original_funcs[name],
'modified': info
})
except Exception as e:
logger.error(f"Function analysis error: {e}")
return changes
def _analyze_class_changes(self, original: str, modified: str) -> List[Dict[str, Any]]:
"""Analyze class structure changes"""
changes = []
try:
original_ast = ast.parse(original)
modified_ast = ast.parse(modified)
def get_class_info(node: ast.ClassDef) -> Dict[str, Any]:
return {
'name': node.name,
'bases': [base.id for base in node.bases if isinstance(base, ast.Name)],
'methods': {
n.name: {
'args': len(n.args.args) - 1, # Subtract 'self'
'decorators': [d.id for d in n.decorator_list if isinstance(d, ast.Name)]
}
for n in node.body if isinstance(n, ast.FunctionDef)
},
'properties': [
n.name for n in node.body
if isinstance(n, ast.FunctionDef) and
any(d.id == 'property' for d in n.decorator_list if isinstance(d, ast.Name))
]
}
original_classes = {
node.name: get_class_info(node)
for node in ast.walk(original_ast)
if isinstance(node, ast.ClassDef)
}
modified_classes = {
node.name: get_class_info(node)
for node in ast.walk(modified_ast)
if isinstance(node, ast.ClassDef)
}
# Analyze changes
for name, info in modified_classes.items():
if name in original_classes:
orig_info = original_classes[name]
if orig_info != info:
changes.append({
'type': 'class',
'name': name,
'changes': {
'bases_changed': orig_info['bases'] != info['bases'],
'methods_changed': orig_info['methods'] != info['methods'],
'properties_changed': orig_info['properties'] != info['properties']
},
'original': orig_info,
'modified': info
})
except Exception as e:
logger.error(f"Class analysis error: {e}")
return changes
def _analyze_variable_changes(self, original: str, modified: str) -> List[Dict[str, Any]]:
"""Analyze variable changes including type hints and values"""
changes = []
try:
original_ast = ast.parse(original)
modified_ast = ast.parse(modified)
def get_variable_info(node: ast.AST) -> Dict[str, Any]:
variables = {}
for n in ast.walk(node):
if isinstance(n, ast.AnnAssign) and isinstance(n.target, ast.Name):
variables[n.target.id] = {
'type': 'annotated',
'annotation': ast.unparse(n.annotation) if n.annotation else None,
'value': ast.unparse(n.value) if n.value else None
}
elif isinstance(n, ast.Assign):
for target in n.targets:
if isinstance(target, ast.Name):
variables[target.id] = {
'type': 'standard',
'value': ast.unparse(n.value)
}
return variables
original_vars = get_variable_info(original_ast)
modified_vars = get_variable_info(modified_ast)
# Find changed variables
for name, info in modified_vars.items():
if name in original_vars:
orig_info = original_vars[name]
if orig_info != info:
changes.append({
'type': 'variable',
'name': name,
'original': orig_info,
'modified': info
})
except Exception as e:
logger.error(f"Variable analysis error: {e}")
return changes
async def _find_import_dependents(
self,
file_path: Path,
import_change: Dict[str, Any]
) -> List[AffectedCode]:
"""Find files that depend on changed imports"""
affected = []
search_path = self._base_path
for python_file in search_path.rglob('*.py'):
if python_file == file_path:
continue
try:
content = self._read_file_content(python_file)
if not content:
continue
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, (ast.Import, ast.ImportFrom)):
imported_names = []
if isinstance(node, ast.Import):
imported_names.extend(n.name for n in node.names)
else:
if node.module:
imported_names.extend(f"{node.module}.{n.name}" for n in node.names)
for removed in import_change.get('removed', []):
if removed in imported_names:
affected.append(AffectedCode(
file_path=str(python_file),
line_range={'start': node.lineno, 'end': node.end_lineno or node.lineno},
change_type='import',
reason=f"Uses removed import '{removed}'",
suggested_action="Update import statement",
severity="high",
reference_type="import",
original_code=ast.unparse(node)
))
except Exception as e:
logger.error(f"Error analyzing imports in {python_file}: {e}")
return affected
async def _find_function_callers(
self,
file_path: Path,
function_change: Dict[str, Any]
) -> List[AffectedCode]:
"""Find all function calls that need updates"""
affected = []
search_path = self._base_path
func_name = function_change['name']
for python_file in search_path.rglob('*.py'):
if python_file == file_path:
continue
try:
content = self._read_file_content(python_file)
if not content:
continue
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, ast.Call):
if (isinstance(node.func, ast.Name) and node.func.id == func_name) or \
(isinstance(node.func, ast.Attribute) and node.func.attr == func_name):
# Check if call signature matches the changes
current_args = len(node.args) + len(node.keywords)
new_required_args = function_change['modified']['args']
if current_args != new_required_args:
affected.append(AffectedCode(
file_path=str(python_file),
line_range={'start': node.lineno, 'end': node.end_lineno or node.lineno},
change_type='function_call',
reason=f"Call to modified function '{func_name}' needs update",
suggested_action=f"Update call signature ({current_args} args to {new_required_args})",
severity="high",
reference_type="function",
original_code=ast.unparse(node)
))
except Exception as e:
logger.error(f"Error analyzing function calls in {python_file}: {e}")
return affected
async def _find_class_dependents(
self,
file_path: Path,
class_change: Dict[str, Any]
) -> List[AffectedCode]:
"""Find classes that inherit or use the modified class"""
affected = []
search_path = self._base_path
class_name = class_change['name']
for python_file in search_path.rglob('*.py'):
if python_file == file_path:
continue
try:
content = self._read_file_content(python_file)
if not content:
continue
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
# Check inheritance
for base in node.bases:
if isinstance(base, ast.Name) and base.id == class_name:
affected.append(AffectedCode(
file_path=str(python_file),
line_range={'start': node.lineno, 'end': node.end_lineno or node.lineno},
change_type='inheritance',
reason=f"Inherits from modified class '{class_name}'",
suggested_action="Review class changes and update inheritance",
severity="high",
reference_type="class",
original_code=ast.unparse(node)
))
# Check instantiation and method calls
elif isinstance(node, ast.Call):
if isinstance(node.func, ast.Name) and node.func.id == class_name:
affected.append(AffectedCode(
file_path=str(python_file),
line_range={'start': node.lineno, 'end': node.end_lineno or node.lineno},
change_type='instantiation',
reason=f"Instantiates modified class '{class_name}'",
suggested_action="Review class changes and update instantiation",
severity="medium",
reference_type="class",
original_code=ast.unparse(node)
))
except Exception as e:
logger.error(f"Error analyzing class usage in {python_file}: {e}")
return affected
async def _find_variable_usage(
self,
file_path: Path,
variable_change: Dict[str, Any]
) -> List[AffectedCode]:
"""Find all uses of the modified variable"""
affected = []
search_path = self._base_path
var_name = variable_change['name']
for python_file in search_path.rglob('*.py'):
if python_file == file_path:
continue
try:
content = self._read_file_content(python_file)
if not content:
continue
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, ast.Name) and node.id == var_name:
context = "write" if isinstance(node.ctx, ast.Store) else "read"
affected.append(AffectedCode(
file_path=str(python_file),
line_range={'start': node.lineno, 'end': node.end_lineno or node.lineno},
change_type='variable_usage',
reason=f"Uses modified variable '{var_name}' ({context} operation)",
suggested_action=f"Review variable changes and update {context} operation",
severity="medium",
reference_type="variable",
original_code=ast.unparse(node)
))
except Exception as e:
logger.error(f"Error analyzing variable usage in {python_file}: {e}")
return affected
async def _create_backup(self, file_path: Path) -> Path:
"""Create backup with proper encoding"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = self._backup_dir / f"{file_path.stem}_backup_{timestamp}{file_path.suffix}"
with open(file_path, 'r', encoding='utf-8', errors='replace') as src:
content = src.read()
with open(backup_path, 'w', encoding='utf-8') as dst:
dst.write(content)
return backup_path
def _validate_file(self, file_path: Path) -> bool:
"""Validate file existence and permissions"""
try:
if not file_path.exists():
logger.error(f"File not found: {file_path}")
return False
if not file_path.is_file():
logger.error(f"Not a file: {file_path}")
return False
# Check read/write permissions
if not os.access(file_path, os.R_OK | os.W_OK):
logger.error(f"Insufficient permissions for {file_path}")
return False
return True
except Exception as e:
logger.error(f"File validation failed: {e}")
return False
def _validate_range(self, start: int, end: int, max_lines: int) -> bool:
"""Validate line range"""
return (0 <= start < max_lines and
0 <= end <= max_lines and
start <= end)
def _validate_modification(
self,
file_path: Path,
original_content: str,
new_content: str
) -> ValidationResult:
"""Validate proposed code modification"""
errors = []
warnings = []
details = {}
try:
# Check for syntax errors in new content
if file_path.suffix == '.py':
try:
ast.parse(new_content)
except SyntaxError as e:
errors.append(f"Syntax error: {str(e)}")
details['syntax_error'] = str(e)
# Check for significant size changes
size_ratio = len(new_content) / len(original_content) if original_content else float('inf')
if size_ratio > 3 or size_ratio < 0.3:
warnings.append(f"Significant size change (ratio: {size_ratio:.2f})")
details['size_ratio'] = size_ratio
# Check for common issues
if new_content.strip() == '':
warnings.append("New content is empty")
if len(new_content.splitlines()) > 100:
warnings.append("New content is very long")
# Indentation check for Python files
if file_path.suffix == '.py':
indent_issues = self._check_indentation(new_content)
if indent_issues:
warnings.extend(indent_issues)
return ValidationResult(
valid=len(errors) == 0,
errors=errors,
warnings=warnings,
details=details
)
except Exception as e:
logger.error(f"Validation failed: {e}")
return ValidationResult(
valid=False,
errors=[str(e)],
warnings=[],
details={'exception': str(e)}
)
def _check_indentation(self, content: str) -> List[str]:
"""Check Python code indentation"""
issues = []
lines = content.splitlines()
for i, line in enumerate(lines, 1):
if line.strip(): # Skip empty lines
# Check if indentation is multiple of 4
indent = len(line) - len(line.lstrip())
if indent % 4 != 0:
issues.append(f"Line {i}: Indentation not a multiple of 4")
# Check for mixed tabs and spaces
if '\t' in line[:indent]:
issues.append(f"Line {i}: Mixed tabs and spaces")
return issues
def _read_file_content(self, file_path: Path) -> Optional[str]:
"""Safely read file content with encoding detection"""
try:
# Try UTF-8 first
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
except UnicodeDecodeError:
# Try to detect encoding
try:
import chardet
with open(file_path, 'rb') as f:
raw = f.read()
detected = chardet.detect(raw)
encoding = detected['encoding']
with open(file_path, 'r', encoding=encoding) as f:
return f.read()
except Exception as e:
logger.error(f"Failed to read file {file_path}: {e}")
return None
async def _restore_backup(self, backup_path: Path, target_path: Path, encoding: str) -> None:
"""Restore from backup with proper encoding"""
try:
with open(backup_path, 'r', encoding=encoding, errors='replace') as src:
content = src.read()
with open(target_path, 'w', encoding=encoding) as dst:
dst.write(content)
except Exception as e:
logger.error(f"Backup restoration failed: {e}")
def get_changes(
self,
file_path: Optional[str] = None,
change_type: Optional[ChangeType] = None
) -> List[CodeChange]:
"""Get filtered change history"""
changes = self._change_history
if file_path:
changes = [c for c in changes if c.file_path == file_path]
if change_type:
changes = [c for c in changes if c.change_type == change_type]
return changes
def clear_old_backups(self, days: int = 7) -> int:
"""Clear backups older than specified days"""
if not self._backup_dir.exists():
return 0
cleared = 0
cutoff = datetime.now().timestamp() - (days * 86400)
try:
for backup_dir in self._backup_dir.parent.iterdir():
if not backup_dir.is_dir():
continue
if backup_dir.stat().st_mtime < cutoff:
shutil.rmtree(backup_dir)
cleared += 1
return cleared
except Exception as e:
logger.error(f"Failed to clear old backups: {e}")
return cleared
def get_affected_files(self) -> Set[str]:
"""Get set of all affected files"""
return self._affected_files
def clear_change_history(self) -> None:
"""Clear change history and affected files tracking"""
self._change_history.clear()
self._affected_files.clear()