MCP Code Analyzer

import ast import logging from pathlib import Path from typing import Dict, Any, List from .base import BaseTool , safe_read_file logger = logging.getLogger(__name__) class PatternDependencyAnalyzer(BaseTool): """Analyze dependencies related to code patterns""" async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]: file_path = arguments.get('file_path') pattern = arguments.get('pattern') if not file_path or not pattern: return {"error": "Both file_path and pattern are required"} path = self._normalize_path(file_path) if not self._validate_path(path): return {"error": "File not found"} cache_key = f"pattern_deps_{path}_{pattern}" if cached := self._get_cached_result(cache_key): return cached try: content = safe_read_file(str(path)) if not content: return {"error": "Could not read file"} result = { "pattern": pattern, "location": await self._find_pattern_locations(content, pattern), "dependencies": await self._analyze_dependencies(content, pattern), "impact": await self._analyze_impact(content, pattern), } self._cache_result(cache_key, result) return result except Exception as e: logger.error(f"Error analyzing pattern dependencies: {e}") return {"error": str(e)} async def _find_pattern_locations(self, content: str, pattern: str) -> List[Dict[str, Any]]: """Find pattern locations in code""" locations = [] try: tree = ast.parse(content) for node in ast.walk(tree): # Check functions if isinstance(node, ast.FunctionDef) and pattern in node.name: locations.append({ "type": "function", "name": node.name, "line": node.lineno, "end_line": getattr(node, 'end_lineno', node.lineno) }) # Check classes elif isinstance(node, ast.ClassDef) and pattern in node.name: locations.append({ "type": "class", "name": node.name, "line": node.lineno, "end_line": getattr(node, 'end_lineno', node.lineno) }) # Check variables elif isinstance(node, ast.Name) and pattern in node.id: locations.append({ "type": "variable", "name": node.id, "line": node.lineno, "context": "assignment" if isinstance(node.ctx, ast.Store) else "usage" }) return locations except Exception as e: logger.error(f"Error finding pattern locations: {e}") return [] async def _analyze_dependencies(self, content: str, pattern: str) -> Dict[str, Any]: """Analyze pattern dependencies""" deps = { "imports": [], "functions": [], "variables": [] } try: tree = ast.parse(content) pattern_nodes = [] # Find nodes containing pattern for node in ast.walk(tree): if hasattr(node, 'name') and pattern in getattr(node, 'name', ''): pattern_nodes.append(node) elif isinstance(node, ast.Name) and pattern in node.id: pattern_nodes.append(node) # Analyze each pattern node for node in pattern_nodes: # Find imports imports = set() for n in ast.walk(node): if isinstance(n, (ast.Import, ast.ImportFrom)): if isinstance(n, ast.Import): imports.add(n.names[0].name) else: imports.add(f"{n.module}.{n.names[0].name}") deps["imports"].extend(list(imports)) # Find function calls if isinstance(node, ast.FunctionDef): for n in ast.walk(node): if isinstance(n, ast.Call) and isinstance(n.func, ast.Name): deps["functions"].append({ "name": n.func.id, "line": n.lineno }) # Find variable uses if isinstance(node, ast.Name): deps["variables"].append({ "name": node.id, "line": node.lineno, "type": type(node.ctx).__name__ }) return deps except Exception as e: logger.error(f"Error analyzing dependencies: {e}") return {} async def _analyze_impact(self, content: str, pattern: str) -> Dict[str, Any]: """Analyze pattern impact""" try: tree = ast.parse(content) impact = { "risk_level": "low", "affected_components": [], "suggestions": [] } # Count pattern occurrences occurrences = content.count(pattern) if occurrences > 10: impact["risk_level"] = "high" elif occurrences > 5: impact["risk_level"] = "medium" # Analyze affected components for node in ast.walk(tree): if isinstance(node, (ast.FunctionDef, ast.ClassDef)): if pattern in ast.dump(node): impact["affected_components"].append({ "type": type(node).__name__.replace("Def", ""), "name": node.name, "line": node.lineno }) # Generate suggestions if len(node.body) > 20: impact["suggestions"].append({ "type": "refactor_suggestion", "message": f"Consider breaking down {node.name} into smaller components", "line": node.lineno }) return impact except Exception as e: logger.error(f"Error analyzing impact: {e}") return {} class SuggestRefactoring(BaseTool): """Suggest refactoring for code patterns""" async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]: pattern = arguments.get('pattern') scope = arguments.get('scope', 'all') if not pattern: return {"error": "Pattern is required"} cache_key = f"refactor_{pattern}_{scope}" if cached := self._get_cached_result(cache_key): return cached try: result = { "pattern": pattern, "scope": scope, "suggestions": await self._generate_suggestions(pattern, scope), "impact": await self._analyze_impact(pattern, scope), "alternatives": await self._suggest_alternatives(pattern) } self._cache_result(cache_key, result) return result except Exception as e: logger.error(f"Error generating refactoring suggestions: {e}") return {"error": str(e)} async def _generate_suggestions(self, pattern: str, scope: str) -> List[Dict[str, Any]]: """Generate refactoring suggestions""" suggestions = [] # Basic suggestions based on pattern type if '_' in pattern: # Likely a function or variable name suggestions.append({ "type": "naming", "message": f"Consider using camelCase instead of snake_case for {pattern}", "example": self._to_camel_case(pattern) }) if len(pattern) < 3: # Too short name suggestions.append({ "type": "naming", "message": "Consider using more descriptive name", "severity": "medium" }) if len(pattern) > 30: # Too long name suggestions.append({ "type": "naming", "message": "Consider using shorter, more concise name", "severity": "medium" }) return suggestions async def _analyze_impact(self, pattern: str, scope: str) -> Dict[str, Any]: """Analyze refactoring impact""" return { "risk_level": "medium", # Default risk level "affected_areas": [ "code readability", "maintainability", "extensibility" ], "effort_level": "medium", "benefits": [ "improved code quality", "better maintainability", "clearer code structure" ] } async def _suggest_alternatives(self, pattern: str) -> List[Dict[str, Any]]: """Suggest alternative patterns""" alternatives = [] # Common patterns and their alternatives patterns = { "get_instance": { "type": "singleton", "alternatives": ["dependency_injection", "factory_method"], "reason": "Reduces coupling and improves testability" }, "factory": { "type": "creation", "alternatives": ["builder_pattern", "abstract_factory"], "reason": "More flexible object creation" } } for key, value in patterns.items(): if key in pattern.lower(): alternatives.append(value) return alternatives def _to_camel_case(self, snake_str: str) -> str: """Convert snake_case to camelCase""" components = snake_str.split('_') return components[0] + ''.join(x.title() for x in components[1:]) class CodePatternAnalyzer(BaseTool): """Analyze and detect code patterns""" async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]: path = arguments.get('path') if not path: return {"error": "Path is required"} path = self._normalize_path(path) if not self._validate_path(path): return {"error": "Invalid path"} cache_key = f"code_patterns_{path}" if cached := self._get_cached_result(cache_key): return cached try: result = { "design_patterns": await self._find_design_patterns(path), "anti_patterns": await self._find_anti_patterns(path), "code_smells": await self._find_code_smells(path), "metrics": await self._calculate_pattern_metrics(path), "suggestions": await self._generate_pattern_suggestions(path) } self._cache_result(cache_key, result) return result except Exception as e: logger.error(f"Error analyzing code patterns: {e}") return {"error": str(e)} def _should_skip(self, path: Path) -> bool: """Check if path should be skipped""" try: if any(excluded in path.parts for excluded in self.analysis_config.excluded_dirs): return True if path.is_file() and any(path.name.endswith(ext) for ext in self.analysis_config.excluded_files): return True return False except: return True async def _find_design_patterns(self, path: Path) -> List[Dict[str, Any]]: """Find common design patterns""" patterns = [] try: for py_file in path.rglob('*.py'): if not self._should_skip(py_file): content = safe_read_file(str(py_file)) if not content: continue try: tree = ast.parse(content) for node in ast.walk(tree): if isinstance(node, ast.ClassDef): # Singleton Pattern if self._is_singleton(node): patterns.append({ "type": "singleton", "class": node.name, "file": str(py_file), "line": node.lineno }) # Factory Pattern if self._is_factory(node): patterns.append({ "type": "factory", "class": node.name, "file": str(py_file), "line": node.lineno }) # Observer Pattern if self._is_observer(node): patterns.append({ "type": "observer", "class": node.name, "file": str(py_file), "line": node.lineno }) except Exception as e: logger.error(f"Error analyzing patterns in {py_file}: {e}") except Exception as e: logger.error(f"Error finding design patterns: {e}") return patterns async def _find_anti_patterns(self, path: Path) -> List[Dict[str, Any]]: """Find code anti-patterns""" anti_patterns = [] try: for py_file in path.rglob('*.py'): if not self._should_skip(py_file): content = safe_read_file(str(py_file)) if not content: continue try: tree = ast.parse(content) # God Class for node in ast.walk(tree): if isinstance(node, ast.ClassDef): methods = len([n for n in node.body if isinstance(n, ast.FunctionDef)]) if methods > 20: anti_patterns.append({ "type": "god_class", "class": node.name, "file": str(py_file), "line": node.lineno, "method_count": methods }) # Long Method elif isinstance(node, ast.FunctionDef): if len(node.body) > 50: anti_patterns.append({ "type": "long_method", "function": node.name, "file": str(py_file), "line": node.lineno, "length": len(node.body) }) except Exception as e: logger.error(f"Error analyzing anti-patterns in {py_file}: {e}") except Exception as e: logger.error(f"Error finding anti-patterns: {e}") return anti_patterns async def _find_code_smells(self, path: Path) -> List[Dict[str, Any]]: """Find code smells""" smells = [] try: for py_file in path.rglob('*.py'): if not self._should_skip(py_file): content = safe_read_file(str(py_file)) if not content: continue try: tree = ast.parse(content) for node in ast.walk(tree): # Duplicate Code (simple check) if isinstance(node, ast.FunctionDef): body_hash = hash(ast.dump(node)) similar_functions = self._find_similar_functions(tree, node, body_hash) if similar_functions: smells.append({ "type": "duplicate_code", "function": node.name, "file": str(py_file), "line": node.lineno, "similar_to": similar_functions }) # Too Many Parameters if isinstance(node, ast.FunctionDef) and len(node.args.args) > 5: smells.append({ "type": "too_many_parameters", "function": node.name, "file": str(py_file), "line": node.lineno, "param_count": len(node.args.args) }) except Exception as e: logger.error(f"Error analyzing code smells in {py_file}: {e}") except Exception as e: logger.error(f"Error finding code smells: {e}") return smells async def _calculate_pattern_metrics(self, path: Path) -> Dict[str, Any]: """Calculate pattern-related metrics""" metrics = { "design_pattern_count": 0, "anti_pattern_count": 0, "code_smell_count": 0, "pattern_distribution": {}, "highest_risk_files": [] } try: design_patterns = await self._find_design_patterns(path) anti_patterns = await self._find_anti_patterns(path) code_smells = await self._find_code_smells(path) metrics["design_pattern_count"] = len(design_patterns) metrics["anti_pattern_count"] = len(anti_patterns) metrics["code_smell_count"] = len(code_smells) # Calculate pattern distribution all_patterns = design_patterns + anti_patterns for pattern in all_patterns: pattern_type = pattern["type"] metrics["pattern_distribution"][pattern_type] = \ metrics["pattern_distribution"].get(pattern_type, 0) + 1 # Find high-risk files file_risks = {} for item in anti_patterns + code_smells: file_path = item["file"] file_risks[file_path] = file_risks.get(file_path, 0) + 1 metrics["highest_risk_files"] = sorted( [{"file": k, "issues": v} for k, v in file_risks.items()], key=lambda x: x["issues"], reverse=True )[:5] except Exception as e: logger.error(f"Error calculating pattern metrics: {e}") return metrics async def _generate_pattern_suggestions(self, path: Path) -> List[Dict[str, Any]]: """Generate pattern-based suggestions""" suggestions = [] try: metrics = await self._calculate_pattern_metrics(path) # Suggest refactoring for high-risk files for file in metrics["highest_risk_files"]: if file["issues"] > 5: suggestions.append({ "type": "refactoring", "file": file["file"], "message": f"Consider refactoring: file has {file['issues']} quality issues", "priority": "high" }) # Suggest pattern usage improvements if metrics["anti_pattern_count"] > metrics["design_pattern_count"]: suggestions.append({ "type": "pattern_usage", "message": "Consider using more design patterns to improve code structure", "priority": "medium" }) # Suggest code smell cleanup if metrics["code_smell_count"] > 10: suggestions.append({ "type": "code_quality", "message": "High number of code smells detected, consider code cleanup", "priority": "high" }) except Exception as e: logger.error(f"Error generating pattern suggestions: {e}") return suggestions def _is_singleton(self, node: ast.ClassDef) -> bool: """Check if class implements singleton pattern""" return any( isinstance(n, ast.FunctionDef) and (n.name == 'get_instance' or n.name == 'getInstance') for n in node.body ) def _is_factory(self, node: ast.ClassDef) -> bool: """Check if class implements factory pattern""" return any( isinstance(n, ast.FunctionDef) and (n.name.startswith('create_') or n.name.startswith('make_')) for n in node.body ) def _is_observer(self, node: ast.ClassDef) -> bool: """Check if class implements observer pattern""" observer_methods = {'update', 'notify', 'subscribe', 'unsubscribe'} class_methods = {n.name for n in node.body if isinstance(n, ast.FunctionDef)} return bool(observer_methods & class_methods) def _find_similar_functions( self, tree: ast.AST, current_node: ast.FunctionDef, current_hash: int ) -> List[Dict[str, Any]]: """Find functions with similar structure""" similar = [] for node in ast.walk(tree): if (isinstance(node, ast.FunctionDef) and node.name != current_node.name and hash(ast.dump(node)) == current_hash): similar.append({ "name": node.name, "line": node.lineno }) return similar class PatternUsageAnalyzer(BaseTool): """Find pattern usages and perform pattern analysis""" async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]: pattern = arguments.get('pattern') pattern_type = arguments.get('pattern_type', 'all') if not pattern: return {"error": "Pattern is required"} cache_key = f"pattern_usage_{pattern}_{pattern_type}" if cached := self._get_cached_result(cache_key): return cached try: result = { "pattern": pattern, "type": pattern_type, "occurrences": await self._find_occurrences(pattern, pattern_type), "statistics": await self._generate_statistics(pattern, pattern_type), "context": await self._analyze_context(pattern, pattern_type), "suggestions": await self._generate_suggestions(pattern, pattern_type) } self._cache_result(cache_key, result) return result except Exception as e: logger.error(f"Error analyzing pattern usage: {e}") return {"error": str(e)} def _should_skip(self, path: Path) -> bool: """Check if path should be skipped""" try: if any(excluded in path.parts for excluded in self.analysis_config.excluded_dirs): return True if path.is_file() and any(path.name.endswith(ext) for ext in self.analysis_config.excluded_files): return True return False except: return True async def _find_occurrences(self, pattern: str, pattern_type: str) -> List[Dict[str, Any]]: """Find pattern occurrences""" occurrences = [] try: for py_file in Path('.').rglob('*.py'): if not self._should_skip(py_file): content = safe_read_file(str(py_file)) if not content: continue try: tree = ast.parse(content) for node in ast.walk(tree): occurrence = None if pattern_type in ['all', 'function']: if isinstance(node, ast.FunctionDef) and pattern in node.name: occurrence = { "type": "function", "name": node.name, "line": node.lineno, "args": len(node.args.args) } if pattern_type in ['all', 'class']: if isinstance(node, ast.ClassDef) and pattern in node.name: occurrence = { "type": "class", "name": node.name, "line": node.lineno, "methods": len([m for m in node.body if isinstance(m, ast.FunctionDef)]) } if pattern_type in ['all', 'variable']: if isinstance(node, ast.Name) and pattern in node.id: occurrence = { "type": "variable", "name": node.id, "line": node.lineno, "context": type(node.ctx).__name__ } if pattern_type in ['all', 'code']: if isinstance(node, ast.Expr) and pattern in ast.dump(node): occurrence = { "type": "code", "line": node.lineno, "content": ast.unparse(node) } if occurrence: occurrence["file"] = str(py_file) occurrences.append(occurrence) except Exception as e: logger.error(f"Error analyzing {py_file}: {e}") except Exception as e: logger.error(f"Error finding occurrences: {e}") return occurrences async def _generate_statistics(self, pattern: str, pattern_type: str) -> Dict[str, Any]: """Generate pattern usage statistics""" stats = { "total_occurrences": 0, "by_type": {}, "by_file": {}, "frequency": {} } try: occurrences = await self._find_occurrences(pattern, pattern_type) stats["total_occurrences"] = len(occurrences) # Count by type for occ in occurrences: stats["by_type"][occ["type"]] = stats["by_type"].get(occ["type"], 0) + 1 stats["by_file"][occ["file"]] = stats["by_file"].get(occ["file"], 0) + 1 # Calculate frequency metrics if stats["total_occurrences"] > 0: total_files = len(set(occ["file"] for occ in occurrences)) stats["frequency"] = { "occurrences_per_file": stats["total_occurrences"] / total_files, "files_with_pattern": total_files } except Exception as e: logger.error(f"Error generating statistics: {e}") return stats async def _analyze_context(self, pattern: str, pattern_type: str) -> Dict[str, Any]: """Analyze pattern usage context""" context = { "common_patterns": [], "related_patterns": [], "usage_examples": [] } try: occurrences = await self._find_occurrences(pattern, pattern_type) for occ in occurrences: if len(context["usage_examples"]) < 5: # Limit examples content = safe_read_file(occ["file"]) if content: lines = content.splitlines() line_idx = occ["line"] - 1 # Get context lines start = max(0, line_idx - 2) end = min(len(lines), line_idx + 3) context["usage_examples"].append({ "file": occ["file"], "line": occ["line"], "code": lines[start:end], "type": occ["type"] }) # Find related patterns (patterns that often appear together) pattern_co_occurrences = {} for occ in occurrences: content = safe_read_file(occ["file"]) if content: tree = ast.parse(content) for node in ast.walk(tree): if isinstance(node, (ast.Name, ast.FunctionDef, ast.ClassDef)): name = getattr(node, 'id', getattr(node, 'name', None)) if name and name != pattern: pattern_co_occurrences[name] = pattern_co_occurrences.get(name, 0) + 1 # Get most common co-occurring patterns context["related_patterns"] = sorted( [{"pattern": k, "count": v} for k, v in pattern_co_occurrences.items()], key=lambda x: x["count"], reverse=True )[:5] except Exception as e: logger.error(f"Error analyzing context: {e}") return context async def _generate_suggestions(self, pattern: str, pattern_type: str) -> List[Dict[str, Any]]: """Generate pattern-related suggestions""" suggestions = [] try: stats = await self._generate_statistics(pattern, pattern_type) # Suggest refactoring if pattern is used too frequently if stats["total_occurrences"] > 20: suggestions.append({ "type": "refactoring", "message": f"Consider refactoring: pattern '{pattern}' is used frequently", "severity": "high" }) # Suggest consistency in usage if pattern_type == 'variable' and '_' in pattern: suggestions.append({ "type": "naming", "message": "Consider using consistent naming convention", "example": self._to_camel_case(pattern) }) # Suggest documentation if pattern is widely used if stats["total_occurrences"] > 10: suggestions.append({ "type": "documentation", "message": f"Consider documenting the pattern usage and purpose", "importance": "medium" }) # Check for pattern complexity context = await self._analyze_context(pattern, pattern_type) if len(context["related_patterns"]) > 5: suggestions.append({ "type": "complexity", "message": "Pattern has many dependencies, consider simplifying", "severity": "medium" }) except Exception as e: logger.error(f"Error generating suggestions: {e}") return suggestions def _to_camel_case(self, snake_str: str) -> str: """Convert snake_case to camelCase""" components = snake_str.split('_') return components[0] + ''.join(x.title() for x in components[1:])