MCP Code Analyzer

import ast import logging from pathlib import Path from typing import Dict, Any, List from .base import BaseTool ,safe_read_file import networkx as nx import logging import ast from pathlib import Path from typing import Dict, Any, List, Optional, Set from datetime import datetime import re import tokenize import io from dataclasses import dataclass logger = logging.getLogger(__name__) class CodeStructureAnalyzer(BaseTool): """Analyze code structure and architecture""" async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]: path = arguments.get('path') if not path: return {"error": "Path is required"} path = self._normalize_path(path) if not self._validate_path(path): return {"error": "Invalid path"} cache_key = f"structure_{path}" if cached := self._get_cached_result(cache_key): return cached try: result = { "structure": await self._analyze_structure(path), "metrics": await self._analyze_metrics(path), "dependencies": await self._analyze_dependencies(path), "architecture": await self._analyze_architecture(path) } self._cache_result(cache_key, result) return result except Exception as e: logger.error(f"Error analyzing code structure: {e}") return {"error": str(e)} def _should_skip(self, path: Path) -> bool: """Check if path should be skipped""" try: if any(excluded in path.parts for excluded in self.analysis_config.excluded_dirs): return True if path.is_file() and any(path.name.endswith(ext) for ext in self.analysis_config.excluded_files): return True return False except: return True async def _analyze_structure(self, path: Path) -> Dict[str, Any]: """Analyze code structure while maintaining existing path handling""" structure = { "modules": [], "classes": [], "functions": [], "hierarchy": {} } try: if isinstance(path, str): path = Path(path) for py_file in path.rglob('*.py'): if not self._should_skip(py_file): content = safe_read_file(str(py_file)) if not content: continue try: tree = ast.parse(content) module_info = { "name": py_file.stem, "path": str(py_file.relative_to(path) if path.exists() else py_file), "classes": [], "functions": [] } for node in ast.walk(tree): if isinstance(node, ast.ClassDef): class_info = { "name": node.name, "line": node.lineno, "methods": [m.name for m in node.body if isinstance(m, ast.FunctionDef)], "bases": [b.id for b in node.bases if isinstance(b, ast.Name)] } module_info["classes"].append(class_info) structure["classes"].append(class_info) elif isinstance(node, ast.FunctionDef): if not isinstance(node.parent, ast.ClassDef): func_info = { "name": node.name, "line": node.lineno, "args": len(node.args.args), "module": module_info["name"] } module_info["functions"].append(func_info) structure["functions"].append(func_info) structure["modules"].append(module_info) except Exception as e: logger.error(f"Error parsing {py_file}: {e}") continue # Build hierarchy structure["hierarchy"] = self._build_hierarchy(structure["classes"]) except Exception as e: logger.error(f"Error analyzing structure: {e}") return structure async def _analyze_metrics(self, path: Path) -> Dict[str, Any]: """Analyze code metrics""" metrics = { "total_lines": 0, "code_lines": 0, "comment_lines": 0, "class_count": 0, "function_count": 0, "complexity": { "average": 0, "highest": 0, "modules": {} } } file_count = 0 total_complexity = 0 try: for py_file in path.rglob('*.py'): if not self._should_skip(py_file): content = safe_read_file(str(py_file)) if not content: continue file_count += 1 lines = content.splitlines() metrics["total_lines"] += len(lines) # Count different types of lines for line in lines: stripped = line.strip() if stripped and not stripped.startswith('#'): metrics["code_lines"] += 1 elif stripped.startswith('#'): metrics["comment_lines"] += 1 # Analyze AST try: tree = ast.parse(content) module_complexity = 0 for node in ast.walk(tree): if isinstance(node, ast.ClassDef): metrics["class_count"] += 1 module_complexity += len(node.body) elif isinstance(node, ast.FunctionDef): metrics["function_count"] += 1 module_complexity += len(node.body) metrics["complexity"]["modules"][py_file.stem] = module_complexity total_complexity += module_complexity metrics["complexity"]["highest"] = max( metrics["complexity"]["highest"], module_complexity ) except Exception as e: logger.error(f"Error analyzing metrics for {py_file}: {e}") if file_count > 0: metrics["complexity"]["average"] = total_complexity / file_count except Exception as e: logger.error(f"Error analyzing metrics: {e}") return metrics async def _analyze_dependencies(self, path: Path) -> Dict[str, Any]: """Analyze code dependencies""" deps = { "imports": {}, "dependencies": {}, "cycles": [] } # Create dependency graph graph = nx.DiGraph() try: for py_file in path.rglob('*.py'): if not self._should_skip(py_file): content = safe_read_file(str(py_file)) if not content: continue module_name = py_file.stem deps["imports"][module_name] = [] try: tree = ast.parse(content) for node in ast.walk(tree): if isinstance(node, ast.Import): for name in node.names: deps["imports"][module_name].append({ "name": name.name, "alias": name.asname, "type": "direct" }) graph.add_edge(module_name, name.name) elif isinstance(node, ast.ImportFrom): if node.module: deps["imports"][module_name].append({ "name": node.module, "imports": [n.name for n in node.names], "type": "from" }) graph.add_edge(module_name, node.module) except Exception as e: logger.error(f"Error analyzing dependencies for {py_file}: {e}") # Find dependency cycles try: cycles = list(nx.simple_cycles(graph)) deps["cycles"] = [{"modules": cycle} for cycle in cycles] except Exception as e: logger.error(f"Error finding dependency cycles: {e}") # Convert graph to dependency dict for node in graph.nodes(): deps["dependencies"][node] = { "imports": list(graph.successors(node)), "imported_by": list(graph.predecessors(node)) } except Exception as e: logger.error(f"Error analyzing dependencies: {e}") return deps async def _analyze_architecture(self, path: Path) -> Dict[str, Any]: """Analyze code architecture""" architecture = { "layers": [], "components": [], "interfaces": [], "patterns": [] } try: for py_file in path.rglob('*.py'): if not self._should_skip(py_file): content = safe_read_file(str(py_file)) if not content: continue try: tree = ast.parse(content) # Analyze interfaces (abstract classes) for node in ast.walk(tree): if isinstance(node, ast.ClassDef): if any(isinstance(child, ast.FunctionDef) and isinstance(child.body[0], ast.Pass) for child in node.body): architecture["interfaces"].append({ "name": node.name, "file": str(py_file.relative_to(path)), "methods": [m.name for m in node.body if isinstance(m, ast.FunctionDef)] }) # Detect common patterns if any(base.id == 'ABC' for base in node.bases if isinstance(base, ast.Name)): architecture["patterns"].append({ "type": "abstract_class", "name": node.name, "file": str(py_file.relative_to(path)) }) except Exception as e: logger.error(f"Error analyzing architecture for {py_file}: {e}") # Identify layers based on directory structure layers = set() for item in path.iterdir(): if item.is_dir() and not item.name.startswith('.'): layers.add(item.name) architecture["layers"] = sorted(list(layers)) # Identify components (modules with multiple classes) components = {} for cls in architecture["interfaces"]: module = Path(cls["file"]).parent.name if module not in components: components[module] = { "name": module, "interfaces": [], "implementations": [] } components[module]["interfaces"].append(cls["name"]) architecture["components"] = list(components.values()) except Exception as e: logger.error(f"Error analyzing architecture: {e}") return architecture def _build_hierarchy(self, classes: List[Dict[str, Any]]) -> Dict[str, Any]: """Build class hierarchy""" hierarchy = {} for cls in classes: if not cls["bases"]: if cls["name"] not in hierarchy: hierarchy[cls["name"]] = [] else: for base in cls["bases"]: if base not in hierarchy: hierarchy[base] = [] hierarchy[base].append(cls["name"]) return hierarchy class ImportAnalyzer(BaseTool): """Analyze import statements and dependencies""" async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]: path = arguments.get('path') if not path: return {"error": "Path is required"} path = self._normalize_path(path) if not self._validate_path(path): return {"error": "Invalid path"} cache_key = f"imports_{path}" if cached := self._get_cached_result(cache_key): return cached try: result = { "imports": await self._analyze_imports(path), "statistics": await self._generate_statistics(path), "issues": await self._find_issues(path), "suggestions": await self._generate_suggestions(path) } self._cache_result(cache_key, result) return result except Exception as e: logger.error(f"Error analyzing imports: {e}") return {"error": str(e)} def _should_skip(self, path: Path) -> bool: """Check if path should be skipped""" try: if any(excluded in path.parts for excluded in self.analysis_config.excluded_dirs): return True if path.is_file() and any(path.name.endswith(ext) for ext in self.analysis_config.excluded_files): return True return False except: return True async def _analyze_imports(self, path: Path) -> Dict[str, Any]: """Analyze all imports in the project""" imports = { "standard_lib": set(), "third_party": set(), "local": set(), "by_file": {} } try: for py_file in path.rglob('*.py'): if not self._should_skip(py_file): content = safe_read_file(str(py_file)) if not content: continue file_imports = { "standard_lib": [], "third_party": [], "local": [] } try: tree = ast.parse(content) for node in ast.walk(tree): if isinstance(node, ast.Import): for name in node.names: import_info = { "name": name.name, "alias": name.asname, "line": node.lineno } if self._is_stdlib_import(name.name): imports["standard_lib"].add(name.name) file_imports["standard_lib"].append(import_info) else: imports["third_party"].add(name.name) file_imports["third_party"].append(import_info) elif isinstance(node, ast.ImportFrom): for name in node.names: import_info = { "module": node.module, "name": name.name, "alias": name.asname, "line": node.lineno } # Assume local import if relative or matches project structure if node.level > 0 or str(path) in str(py_file): imports["local"].add(f"{node.module}.{name.name}") file_imports["local"].append(import_info) elif self._is_stdlib_import(node.module): imports["standard_lib"].add(f"{node.module}.{name.name}") file_imports["standard_lib"].append(import_info) else: imports["third_party"].add(f"{node.module}.{name.name}") file_imports["third_party"].append(import_info) except Exception as e: logger.error(f"Error analyzing imports for {py_file}: {e}") imports["by_file"][str(py_file.relative_to(path))] = file_imports except Exception as e: logger.error(f"Error analyzing imports: {e}") # Convert sets to sorted lists for JSON serialization imports["standard_lib"] = sorted(imports["standard_lib"]) imports["third_party"] = sorted(imports["third_party"]) imports["local"] = sorted(imports["local"]) return imports async def _generate_statistics(self, path: Path) -> Dict[str, Any]: """Generate import statistics""" stats = { "total_imports": 0, "by_type": { "standard_lib": 0, "third_party": 0, "local": 0 }, "most_imported": [], "files_with_most_imports": [] } try: imports = await self._analyze_imports(path) import_counts = {} file_import_counts = {} # Count imports by type stats["by_type"]["standard_lib"] = len(imports["standard_lib"]) stats["by_type"]["third_party"] = len(imports["third_party"]) stats["by_type"]["local"] = len(imports["local"]) stats["total_imports"] = sum(stats["by_type"].values()) # Count individual imports for file_imports in imports["by_file"].values(): for import_type in ["standard_lib", "third_party", "local"]: for imp in file_imports[import_type]: name = imp.get("module", imp["name"]) import_counts[name] = import_counts.get(name, 0) + 1 # Count imports per file for file, file_imports in imports["by_file"].items(): count = sum(len(imps) for imps in file_imports.values()) file_import_counts[file] = count # Get most used imports stats["most_imported"] = sorted( [{"name": k, "count": v} for k, v in import_counts.items()], key=lambda x: x["count"], reverse=True )[:10] # Get files with most imports stats["files_with_most_imports"] = sorted( [{"file": k, "count": v} for k, v in file_import_counts.items()], key=lambda x: x["count"], reverse=True )[:10] except Exception as e: logger.error(f"Error generating import statistics: {e}") return stats async def _find_issues(self, path: Path) -> List[Dict[str, Any]]: """Find potential import issues""" issues = [] try: imports = await self._analyze_imports(path) for file, file_imports in imports["by_file"].items(): # Check for duplicate imports all_imports = [] for imp_type in file_imports.values(): for imp in imp_type: name = imp.get("module", imp["name"]) if name in all_imports: issues.append({ "type": "duplicate_import", "file": file, "line": imp["line"], "import": name, "severity": "warning" }) all_imports.append(name) # Check for unused imports (basic check) content = safe_read_file(str(Path(path) / file)) if content: for imp_type in file_imports.values(): for imp in imp_type: name = imp.get("name") if name and name not in content.split(f"import {name}")[1:]: issues.append({ "type": "potentially_unused", "file": file, "line": imp["line"], "import": name, "severity": "info" }) except Exception as e: logger.error(f"Error finding import issues: {e}") return issues async def _generate_suggestions(self, path: Path) -> List[Dict[str, Any]]: """Generate import-related suggestions""" suggestions = [] try: stats = await self._generate_statistics(path) issues = await self._find_issues(path) # Suggest organizing imports if there are many if stats["total_imports"] > 50: suggestions.append({ "type": "organization", "message": "Consider using import organization tools like isort", "reason": "Large number of imports" }) # Suggest import aliasing for commonly used long names for imp in stats["most_imported"]: if len(imp["name"].split('.')) > 2 and imp["count"] > 5: suggestions.append({ "type": "alias", "message": f"Consider using an alias for frequently used import: {imp['name']}", "example": f"import {imp['name']} as {imp['name'].split('.')[-1].lower()}" }) # Suggest fixing issues duplicate_count = len([i for i in issues if i["type"] == "duplicate_import"]) if duplicate_count > 0: suggestions.append({ "type": "cleanup", "message": f"Clean up {duplicate_count} duplicate imports", "importance": "high" }) except Exception as e: logger.error(f"Error generating import suggestions: {e}") return suggestions def _is_stdlib_import(self, module_name: str) -> bool: """Check if import is from Python standard library""" stdlib_modules = { 'abc', 'argparse', 'ast', 'asyncio', 'collections', 'concurrent', 'contextlib', 'datetime', 'functools', 'importlib', 'inspect', 'io', 'json', 'logging', 'math', 'os', 'pathlib', 'pickle', 're', 'sys', 'threading', 'time', 'typing', 'uuid', 'warnings' } return module_name.split('.')[0] in stdlib_modules @dataclass class AnalysisResult: """Code analysis result container""" success: bool data: Optional[Dict[str, Any]] = None error: Optional[str] = None class ProjectAnalyzer(BaseTool): """Advanced project structure and code analysis tool""" async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]: operation = arguments.get('operation', 'analyze') target_path = arguments.get('path', '.') operations = { 'analyze': self._analyze_project, 'structure': self._analyze_structure, 'dependencies': self._analyze_dependencies, 'complexity': self._analyze_complexity, 'patterns': self._analyze_patterns } if operation not in operations: return {"error": f"Unknown operation: {operation}"} try: result = await operations[operation](Path(target_path), arguments) return {"success": True, "data": result} except Exception as e: logger.error(f"ProjectAnalyzer operation failed: {e}") return {"success": False, "error": str(e)} async def _analyze_project(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]: """Perform comprehensive project analysis""" try: structure = await self._analyze_structure(path, args) dependencies = await self._analyze_dependencies(path, args) complexity = await self._analyze_complexity(path, args) patterns = await self._analyze_patterns(path, args) return { "overview": { "path": str(path), "timestamp": datetime.now().isoformat(), "total_files": len(structure.get("files", [])), "total_lines": complexity.get("total_lines", 0) }, "structure": structure, "dependencies": dependencies, "complexity": complexity, "patterns": patterns } except Exception as e: raise RuntimeError(f"Project analysis failed: {e}") async def _analyze_structure(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]: """Analyze project structure""" files = [] modules = [] try: for file_path in path.rglob("*.py"): if self._should_skip(file_path): continue with tokenize.open(file_path) as f: content = f.read() try: tree = ast.parse(content) module_info = { "name": file_path.stem, "path": str(file_path.relative_to(path)), "classes": [], "functions": [], "imports": [] } for node in ast.walk(tree): if isinstance(node, ast.ClassDef): module_info["classes"].append({ "name": node.name, "line": node.lineno, "methods": len([m for m in node.body if isinstance(m, ast.FunctionDef)]), "bases": [base.id for base in node.bases if isinstance(base, ast.Name)] }) elif isinstance(node, ast.FunctionDef): if not any(isinstance(parent, ast.ClassDef) for parent in ast.walk(tree)): module_info["functions"].append({ "name": node.name, "line": node.lineno, "arguments": len(node.args.args), "decorators": [d.id for d in node.decorator_list if isinstance(d, ast.Name)] }) elif isinstance(node, (ast.Import, ast.ImportFrom)): if isinstance(node, ast.Import): for alias in node.names: module_info["imports"].append({ "type": "import", "name": alias.name, "alias": alias.asname }) else: for alias in node.names: module_info["imports"].append({ "type": "from", "module": node.module, "name": alias.name, "alias": alias.asname }) modules.append(module_info) files.append({ "path": str(file_path.relative_to(path)), "size": file_path.stat().st_size, "last_modified": datetime.fromtimestamp(file_path.stat().st_mtime).isoformat() }) except Exception as e: logger.error(f"Failed to analyze {file_path}: {e}") return { "files": files, "modules": modules, "statistics": { "total_files": len(files), "total_modules": len(modules), "total_classes": sum(len(m["classes"]) for m in modules), "total_functions": sum(len(m["functions"]) for m in modules) } } except Exception as e: raise RuntimeError(f"Structure analysis failed: {e}") async def _analyze_dependencies(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]: """Analyze project dependencies""" try: import_graph = {} external_deps = set() stdlib_deps = set() for file_path in path.rglob("*.py"): if self._should_skip(file_path): continue with tokenize.open(file_path) as f: content = f.read() try: tree = ast.parse(content) relative_path = str(file_path.relative_to(path)) import_graph[relative_path] = {"imports": [], "imported_by": []} for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: import_graph[relative_path]["imports"].append(alias.name) if self._is_stdlib_module(alias.name): stdlib_deps.add(alias.name) else: external_deps.add(alias.name) elif isinstance(node, ast.ImportFrom): if node.module: import_graph[relative_path]["imports"].append(node.module) if self._is_stdlib_module(node.module): stdlib_deps.add(node.module) else: external_deps.add(node.module) except Exception as e: logger.error(f"Failed to analyze dependencies in {file_path}: {e}") # Build imported_by relationships for file_path, deps in import_graph.items(): for imported in deps["imports"]: for other_file, other_deps in import_graph.items(): if imported in other_file: other_deps["imported_by"].append(file_path) return { "import_graph": import_graph, "external_dependencies": sorted(list(external_deps)), "stdlib_dependencies": sorted(list(stdlib_deps)), "statistics": { "total_imports": sum(len(d["imports"]) for d in import_graph.values()), "external_deps_count": len(external_deps), "stdlib_deps_count": len(stdlib_deps) } } except Exception as e: raise RuntimeError(f"Dependency analysis failed: {e}") async def _analyze_complexity(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]: """Analyze code complexity metrics""" try: complexity_data = { "files": {}, "total_lines": 0, "total_complexity": 0, "average_complexity": 0, "hotspots": [] } file_count = 0 for file_path in path.rglob("*.py"): if self._should_skip(file_path): continue with tokenize.open(file_path) as f: content = f.read() try: tree = ast.parse(content) lines = content.splitlines() file_complexity = { "lines": len(lines), "code_lines": len([l for l in lines if l.strip() and not l.strip().startswith("#")]), "classes": [], "functions": [], "complexity_score": 0 } # Analyze classes and methods for node in ast.walk(tree): if isinstance(node, ast.ClassDef): class_info = { "name": node.name, "methods": [], "complexity": self._calculate_node_complexity(node) } for method in node.body: if isinstance(method, ast.FunctionDef): method_complexity = self._calculate_node_complexity(method) class_info["methods"].append({ "name": method.name, "complexity": method_complexity }) class_info["complexity"] += method_complexity file_complexity["classes"].append(class_info) file_complexity["complexity_score"] += class_info["complexity"] elif isinstance(node, ast.FunctionDef): if not any(isinstance(parent, ast.ClassDef) for parent in ast.walk(tree)): func_complexity = self._calculate_node_complexity(node) file_complexity["functions"].append({ "name": node.name, "complexity": func_complexity }) file_complexity["complexity_score"] += func_complexity relative_path = str(file_path.relative_to(path)) complexity_data["files"][relative_path] = file_complexity complexity_data["total_lines"] += file_complexity["lines"] complexity_data["total_complexity"] += file_complexity["complexity_score"] # Track potential hotspots if file_complexity["complexity_score"] > 20: # Arbitrary threshold complexity_data["hotspots"].append({ "file": relative_path, "complexity": file_complexity["complexity_score"], "lines": file_complexity["lines"] }) file_count += 1 except Exception as e: logger.error(f"Failed to analyze complexity in {file_path}: {e}") if file_count > 0: complexity_data["average_complexity"] = complexity_data["total_complexity"] / file_count # Sort hotspots by complexity complexity_data["hotspots"].sort(key=lambda x: x["complexity"], reverse=True) return complexity_data except Exception as e: raise RuntimeError(f"Complexity analysis failed: {e}") async def _analyze_patterns(self, path: Path, args: Dict[str, Any]) -> Dict[str, Any]: """Analyze code patterns and anti-patterns""" try: patterns_data = { "design_patterns": [], "anti_patterns": [], "code_smells": [], "best_practices": { "followed": [], "violations": [] } } for file_path in path.rglob("*.py"): if self._should_skip(file_path): continue with tokenize.open(file_path) as f: content = f.read() try: tree = ast.parse(content) relative_path = str(file_path.relative_to(path)) # Analyze design patterns self._find_design_patterns(tree, relative_path, patterns_data) # Analyze anti-patterns and code smells self._find_anti_patterns(tree, relative_path, patterns_data) # Check best practices self._check_best_practices(tree, relative_path, patterns_data) except Exception as e: logger.error(f"Failed to analyze patterns in {file_path}: {e}") return patterns_data except Exception as e: raise RuntimeError(f"Pattern analysis failed: {e}") def _calculate_node_complexity(self, node: ast.AST) -> int: """Calculate cyclomatic complexity of an AST node""" complexity = 1 # Base complexity for child in ast.walk(node): if isinstance(child, (ast.If, ast.While, ast.For, ast.ExceptHandler)): complexity += 1 elif isinstance(child, ast.BoolOp): complexity += len(child.values) - 1 return complexity def _is_stdlib_module(self, module_name: str) -> bool: """Check if a module is from Python standard library""" stdlib_modules = { 'abc', 'argparse', 'ast', 'asyncio', 'collections', 'concurrent', 'contextlib', 'datetime', 'functools', 'importlib', 'inspect', 'io', 'json', 'logging', 'math', 'os', 'pathlib', 'pickle', 're', 'sys', 'threading', 'time', 'typing', 'uuid', 'warnings' } return module_name.split('.')[0] in stdlib_modules def _find_design_patterns(self, tree: ast.AST, file_path: str, patterns_data: Dict[str, Any]) -> None: """Find common design patterns in code""" for node in ast.walk(tree): if isinstance(node, ast.ClassDef): # Singleton pattern if any(method.name == 'get_instance' for method in node.body if isinstance(method, ast.FunctionDef)): patterns_data["design_patterns"].append({ "type": "singleton", "file": file_path, "class": node.name, "line": node.lineno }) # Factory pattern if any(method.name.startswith(('create_', 'make_')) for method in node.body if isinstance(method, ast.FunctionDef)): patterns_data["design_patterns"].append({ "type": "factory", "file": file_path, "class": node.name, "line": node.lineno }) # Observer pattern if any(method.name in ('update', 'notify', 'subscribe', 'unsubscribe') for method in node.body if isinstance(method, ast.FunctionDef)): patterns_data["design_patterns"].append({ "type": "observer", "file": file_path, "class": node.name, "line": node.lineno }) def _find_anti_patterns(self, tree: ast.AST, file_path: str, patterns_data: Dict[str, Any]) -> None: """Find anti-patterns and code smells""" for node in ast.walk(tree): # God Class (too many methods) if isinstance(node, ast.ClassDef): methods = len([n for n in node.body if isinstance(n, ast.FunctionDef)]) if methods > 20: patterns_data["anti_patterns"].append({ "type": "god_class", "file": file_path, "class": node.name, "methods": methods, "line": node.lineno }) # Long Method if isinstance(node, ast.FunctionDef): if len(node.body) > 50: patterns_data["code_smells"].append({ "type": "long_method", "file": file_path, "function": node.name, "length": len(node.body), "line": node.lineno }) # Too Many Parameters if len(node.args.args) > 5: patterns_data["code_smells"].append({ "type": "too_many_parameters", "file": file_path, "function": node.name, "parameter_count": len(node.args.args), "line": node.lineno }) def _check_best_practices(self, tree: ast.AST, file_path: str, patterns_data: Dict[str, Any]) -> None: """Check Python best practices""" for node in ast.walk(tree): # Proper docstrings if isinstance(node, (ast.ClassDef, ast.FunctionDef)): if ast.get_docstring(node): patterns_data["best_practices"]["followed"].append({ "type": "has_docstring", "file": file_path, "name": node.name, "line": node.lineno }) else: patterns_data["best_practices"]["violations"].append({ "type": "missing_docstring", "file": file_path, "name": node.name, "line": node.lineno }) class CodeValidator(BaseTool): """Advanced code validation and quality checker""" async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]: target_path = arguments.get('path') validation_type = arguments.get('type', 'all') if not target_path: return {"error": "Path is required"} try: result = await self._validate_code(Path(target_path), validation_type) return {"success": True, "data": result} except Exception as e: logger.error(f"CodeValidator operation failed: {e}") return {"success": False, "error": str(e)} async def _validate_code(self, path: Path, validation_type: str) -> Dict[str, Any]: """Perform code validation""" validation_results = { "path": str(path), "timestamp": datetime.now().isoformat(), "validations": [], "summary": { "total_issues": 0, "error_count": 0, "warning_count": 0, "style_count": 0 } } try: # Read file content with tokenize.open(path) as f: content = f.read() # Basic syntax check try: ast.parse(content) validation_results["validations"].append({ "type": "syntax", "status": "passed", "message": "Syntax is valid" }) except SyntaxError as e: validation_results["validations"].append({ "type": "syntax", "status": "failed", "message": str(e), "line": e.lineno, "severity": "error" }) validation_results["summary"]["error_count"] += 1 # Style checks style_issues = self._check_style(content) validation_results["validations"].extend(style_issues) validation_results["summary"]["style_count"] += len(style_issues) # Security checks security_issues = self._check_security(content) validation_results["validations"].extend(security_issues) for issue in security_issues: if issue.get("severity") == "error": validation_results["summary"]["error_count"] += 1 else: validation_results["summary"]["warning_count"] += 1 # Complexity checks if validation_type in ['all', 'complexity']: complexity_issues = self._check_complexity(content) validation_results["validations"].extend(complexity_issues) validation_results["summary"]["warning_count"] += len(complexity_issues) validation_results["summary"]["total_issues"] = ( validation_results["summary"]["error_count"] + validation_results["summary"]["warning_count"] + validation_results["summary"]["style_count"] ) return validation_results except Exception as e: raise RuntimeError(f"Validation failed: {e}") def _check_style(self, content: str) -> List[Dict[str, Any]]: """Check code style issues""" issues = [] lines = content.splitlines() for i, line in enumerate(lines, 1): # Line length check if len(line) > 79: issues.append({ "type": "style", "rule": "line_length", "message": "Line too long", "line": i, "severity": "style" }) # Indentation check if line.strip() and (len(line) - len(line.lstrip())) % 4 != 0: issues.append({ "type": "style", "rule": "indentation", "message": "Incorrect indentation", "line": i, "severity": "style" }) # Trailing whitespace if line.rstrip() != line: issues.append({ "type": "style", "rule": "trailing_whitespace", "message": "Trailing whitespace", "line": i, "severity": "style" }) return issues def _check_security(self, content: str) -> List[Dict[str, Any]]: """Check security issues""" issues = [] # Dangerous patterns dangerous_patterns = { r"eval\(": "Use of eval() is dangerous", r"exec\(": "Use of exec() is dangerous", r"os\.system\(": "Use subprocess module instead of os.system", r"subprocess\.call\(.*shell=True": "shell=True can be dangerous", r"pickle\.load": "Pickle can be dangerous with untrusted data" } for pattern, message in dangerous_patterns.items(): matches = re.finditer(pattern, content) for match in matches: line_no = content.count('\n', 0, match.start()) + 1 issues.append({ "type": "security", "rule": pattern.replace("\\", ""), "message": message, "line": line_no, "severity": "error" }) return issues def _check_complexity(self, content: str) -> List[Dict[str, Any]]: """Check code complexity issues""" issues = [] try: tree = ast.parse(content) for node in ast.walk(tree): if isinstance(node, ast.FunctionDef): complexity = self._calculate_complexity(node) if complexity > 10: issues.append({ "type": "complexity", "rule": "cyclomatic_complexity", "message": f"Function too complex (score: {complexity})", "line": node.lineno, "severity": "warning", "details": { "complexity_score": complexity, "function_name": node.name } }) except Exception as e: logger.error(f"Complexity check failed: {e}") return issues def _calculate_complexity(self, node: ast.AST) -> int: """Calculate cyclomatic complexity""" complexity = 1 for child in ast.walk(node): if isinstance(child, (ast.If, ast.While, ast.For, ast.ExceptHandler)): complexity += 1 elif isinstance(child, ast.BoolOp): complexity += len(child.values) - 1 return complexity class SyntaxChecker(BaseTool): """Advanced syntax checking and analysis tool""" async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]: target_path = arguments.get('path') check_type = arguments.get('check_type', 'all') language = arguments.get('language', 'python') if not target_path: return {"error": "Path is required"} try: result = await self._analyze_syntax(Path(target_path), check_type, language) return {"success": True, "data": result} except Exception as e: logger.error(f"SyntaxChecker operation failed: {e}") return {"success": False, "error": str(e)} async def _analyze_syntax(self, path: Path, check_type: str, language: str) -> Dict[str, Any]: """Analyze code syntax""" if not path.exists(): raise FileNotFoundError(f"File not found: {path}") if language.lower() not in ['python', 'python3']: raise ValueError("Only Python language is supported") try: with tokenize.open(path) as f: content = f.read() analysis_result = { "file": str(path), "language": language, "timestamp": datetime.now().isoformat(), "syntax_valid": False, "details": [] } # Basic syntax check try: tree = ast.parse(content) analysis_result["syntax_valid"] = True except SyntaxError as e: analysis_result["details"].append({ "type": "error", "code": "syntax_error", "message": str(e), "line": e.lineno, "offset": e.offset, "text": e.text }) return analysis_result # If syntax is valid, perform detailed analysis if check_type in ['all', 'tokens']: analysis_result["token_analysis"] = await self._analyze_tokens(content) if check_type in ['all', 'ast']: analysis_result["ast_analysis"] = await self._analyze_ast(tree) if check_type in ['all', 'imports']: analysis_result["import_analysis"] = await self._analyze_imports(tree) if check_type in ['all', 'naming']: analysis_result["naming_analysis"] = await self._analyze_naming(tree) return analysis_result except Exception as e: raise RuntimeError(f"Syntax analysis failed: {e}") async def _analyze_tokens(self, content: str) -> Dict[str, Any]: """Analyze code tokens""" token_analysis = { "token_count": 0, "token_types": {}, "line_continuations": 0, "string_literals": 0, "tokens_by_line": {}, "issues": [] } try: # Create a string buffer and tokenize it buffer = io.StringIO(content) tokens = list(tokenize.generate_tokens(buffer.readline)) current_line = 1 line_tokens = [] for token in tokens: token_type = tokenize.tok_name[token.type] token_analysis["token_count"] += 1 token_analysis["token_types"][token_type] = \ token_analysis["token_types"].get(token_type, 0) + 1 # Track tokens by line if token.start[0] != current_line: if line_tokens: token_analysis["tokens_by_line"][current_line] = line_tokens current_line = token.start[0] line_tokens = [] line_tokens.append(token_type) # Check for specific tokens if token.type == tokenize.STRING: token_analysis["string_literals"] += 1 elif token.type == tokenize.NL: token_analysis["line_continuations"] += 1 # Check for potential issues if token.type == tokenize.OP and token.string == ';': token_analysis["issues"].append({ "type": "style", "message": "Semicolon found; multiple statements on one line", "line": token.start[0] }) return token_analysis except Exception as e: logger.error(f"Token analysis failed: {e}") return {"error": str(e)} async def _analyze_ast(self, tree: ast.AST) -> Dict[str, Any]: """Analyze AST structure""" ast_analysis = { "node_types": {}, "depth": 0, "branches": [], "complexity": { "functions": [], "classes": [] } } def analyze_node(node, depth=0): node_type = type(node).__name__ ast_analysis["node_types"][node_type] = \ ast_analysis["node_types"].get(node_type, 0) + 1 ast_analysis["depth"] = max(ast_analysis["depth"], depth) # Analyze functions if isinstance(node, ast.FunctionDef): function_info = { "name": node.name, "line": node.lineno, "args": len(node.args.args), "decorators": len(node.decorator_list), "complexity": self._calculate_node_complexity(node) } ast_analysis["complexity"]["functions"].append(function_info) # Analyze classes elif isinstance(node, ast.ClassDef): class_info = { "name": node.name, "line": node.lineno, "bases": len(node.bases), "methods": len([n for n in node.body if isinstance(n, ast.FunctionDef)]), "complexity": self._calculate_node_complexity(node) } ast_analysis["complexity"]["classes"].append(class_info) # Analyze branching elif isinstance(node, (ast.If, ast.For, ast.While)): branch_info = { "type": type(node).__name__, "line": node.lineno, "depth": depth } ast_analysis["branches"].append(branch_info) for child in ast.iter_child_nodes(node): analyze_node(child, depth + 1) analyze_node(tree) return ast_analysis async def _analyze_imports(self, tree: ast.AST) -> Dict[str, Any]: """Analyze import statements""" import_analysis = { "imports": [], "from_imports": [], "stats": { "total_imports": 0, "unique_modules": set(), "relative_imports": 0 } } for node in ast.walk(tree): if isinstance(node, ast.Import): for name in node.names: import_info = { "name": name.name, "alias": name.asname, "line": node.lineno } import_analysis["imports"].append(import_info) import_analysis["stats"]["unique_modules"].add(name.name.split('.')[0]) import_analysis["stats"]["total_imports"] += 1 elif isinstance(node, ast.ImportFrom): if node.module: for name in node.names: import_info = { "module": node.module, "name": name.name, "alias": name.asname, "level": node.level, "line": node.lineno } import_analysis["from_imports"].append(import_info) import_analysis["stats"]["unique_modules"].add(node.module.split('.')[0]) import_analysis["stats"]["total_imports"] += 1 if node.level > 0: import_analysis["stats"]["relative_imports"] += 1 import_analysis["stats"]["unique_modules"] = list(import_analysis["stats"]["unique_modules"]) return import_analysis async def _analyze_naming(self, tree: ast.AST) -> Dict[str, Any]: """Analyze naming conventions""" naming_analysis = { "conventions": { "snake_case": [], "camel_case": [], "non_conventional": [] }, "issues": [], "suggestions": [] } def classify_name(name: str, node_type: str, line: int): if re.match(r'^[a-z][a-z0-9_]*$', name): naming_analysis["conventions"]["snake_case"].append({ "name": name, "type": node_type, "line": line }) elif re.match(r'^[A-Z][a-zA-Z0-9]*$', name): naming_analysis["conventions"]["camel_case"].append({ "name": name, "type": node_type, "line": line }) else: naming_analysis["conventions"]["non_conventional"].append({ "name": name, "type": node_type, "line": line }) # Generate suggestion if node_type == "class" and not name[0].isupper(): naming_analysis["issues"].append({ "type": "naming_convention", "message": f"Class name '{name}' should use CamelCase", "line": line }) naming_analysis["suggestions"].append({ "original": name, "suggested": name[0].upper() + name[1:], "line": line }) elif node_type == "function" and not name.islower(): snake_case = ''.join(['_' + c.lower() if c.isupper() else c.lower() for c in name]).lstrip('_') naming_analysis["issues"].append({ "type": "naming_convention", "message": f"Function name '{name}' should use snake_case", "line": line }) naming_analysis["suggestions"].append({ "original": name, "suggested": snake_case, "line": line }) for node in ast.walk(tree): if isinstance(node, ast.ClassDef): classify_name(node.name, "class", node.lineno) elif isinstance(node, ast.FunctionDef): classify_name(node.name, "function", node.lineno) elif isinstance(node, ast.Name) and isinstance(node.ctx, ast.Store): classify_name(node.id, "variable", node.lineno) return naming_analysis def _calculate_node_complexity(self, node: ast.AST) -> int: """Calculate complexity of an AST node""" complexity = 1 for child in ast.walk(node): if isinstance(child, (ast.If, ast.While, ast.For, ast.ExceptHandler)): complexity += 1 elif isinstance(child, ast.BoolOp): complexity += len(child.values) - 1 return complexity