Skip to main content
Glama
codebase_optimizer_engine.py45.3 kB
#!/usr/bin/env python3 """ CodeBase Optimizer Engine ========================= A sophisticated analysis engine for project validation, duplicate detection, and optimization recommendations. Designed to learn and improve with each use. Author: AI Assistant License: MIT """ import os import json import sqlite3 import hashlib import re import ast import subprocess from pathlib import Path from typing import Dict, List, Tuple, Optional, Any from dataclasses import dataclass from datetime import datetime import logging # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class AnalysisResult: """Structured result for analysis operations""" success: bool data: Dict[str, Any] errors: List[str] warnings: List[str] recommendations: List[str] confidence: float @dataclass class ProjectMetrics: """Project health metrics""" total_files: int lines_of_code: int duplicated_lines: int complexity_score: float security_score: float organization_score: float duplication_percentage: float class PatternDatabase: """SQLite database for learning and storing patterns""" def __init__(self, db_path: str = "codebase_patterns.db"): self.db_path = db_path self.init_database() def init_database(self): """Initialize the pattern database""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Projects table cursor.execute(''' CREATE TABLE IF NOT EXISTS projects ( id INTEGER PRIMARY KEY, name TEXT UNIQUE, path TEXT, type TEXT, last_analyzed TIMESTAMP, metrics TEXT ) ''') # Patterns table cursor.execute(''' CREATE TABLE IF NOT EXISTS patterns ( id INTEGER PRIMARY KEY, pattern_type TEXT, pattern_hash TEXT UNIQUE, pattern_data TEXT, frequency INTEGER DEFAULT 1, effectiveness_score REAL DEFAULT 0.5, first_seen TIMESTAMP, last_seen TIMESTAMP ) ''') # Improvements table cursor.execute(''' CREATE TABLE IF NOT EXISTS improvements ( id INTEGER PRIMARY KEY, project_id INTEGER, improvement_type TEXT, before_metrics TEXT, after_metrics TEXT, success_score REAL, applied_date TIMESTAMP, FOREIGN KEY (project_id) REFERENCES projects (id) ) ''') conn.commit() conn.close() def record_pattern(self, pattern_type: str, pattern_data: Dict) -> None: """Record a detected pattern""" pattern_hash = hashlib.md5(str(pattern_data).encode()).hexdigest() conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO patterns (pattern_type, pattern_hash, pattern_data, frequency, last_seen) VALUES (?, ?, ?, COALESCE((SELECT frequency FROM patterns WHERE pattern_hash = ?), 0) + 1, ?) ''', (pattern_type, pattern_hash, json.dumps(pattern_data), pattern_hash, datetime.now())) conn.commit() conn.close() def get_learned_patterns(self, pattern_type: str = None) -> List[Dict]: """Get learned patterns, optionally filtered by type""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() if pattern_type: cursor.execute('SELECT * FROM patterns WHERE pattern_type = ? ORDER BY frequency DESC', (pattern_type,)) else: cursor.execute('SELECT * FROM patterns ORDER BY frequency DESC') patterns = cursor.fetchall() conn.close() return [dict(zip([col[0] for col in cursor.description], row)) for row in patterns] class CodebaseAnalyzer: """Main analysis engine for codebase optimization""" def __init__(self, project_path: str): self.project_path = Path(project_path).resolve() self.pattern_db = PatternDatabase() self.supported_languages = { '.py': 'python', '.js': 'javascript', '.ts': 'typescript', '.java': 'java', '.go': 'go', '.rs': 'rust', '.cpp': 'cpp', '.c': 'c', '.php': 'php', '.rb': 'ruby' } self.results = {} def analyze_project_structure(self) -> AnalysisResult: """Analyze overall project structure and organization""" logger.info(f"Analyzing project structure: {self.project_path}") try: structure_data = { 'root_path': str(self.project_path), 'directory_tree': self._build_directory_tree(), 'file_distribution': self._analyze_file_distribution(), 'naming_conventions': self._analyze_naming_conventions(), 'architecture_type': self._detect_architecture_type(), 'organization_score': 0.0 } # Calculate organization score structure_data['organization_score'] = self._calculate_organization_score(structure_data) # Record patterns self.pattern_db.record_pattern('project_structure', structure_data) recommendations = self._generate_structure_recommendations(structure_data) return AnalysisResult( success=True, data=structure_data, errors=[], warnings=[], recommendations=recommendations, confidence=0.9 ) except Exception as e: logger.error(f"Structure analysis failed: {e}") return AnalysisResult( success=False, data={}, errors=[str(e)], warnings=[], recommendations=[], confidence=0.0 ) def detect_code_duplicates(self, languages: List[str] = None) -> AnalysisResult: """Detect code duplications across the project""" logger.info("Detecting code duplicates") try: if languages is None: languages = list(self.supported_languages.values()) duplicate_data = { 'function_duplicates': self._find_function_duplicates(languages), 'code_block_duplicates': self._find_code_block_duplicates(languages), 'import_duplicates': self._find_import_duplicates(languages), 'configuration_duplicates': self._find_config_duplicates(), 'total_duplicated_lines': 0, 'duplication_percentage': 0.0 } # Calculate duplication metrics total_lines = self._count_total_lines() duplicated_lines = self._count_duplicated_lines(duplicate_data) duplicate_data['total_duplicated_lines'] = duplicated_lines duplicate_data['duplication_percentage'] = (duplicated_lines / total_lines * 100) if total_lines > 0 else 0 # Record patterns self.pattern_db.record_pattern('code_duplicates', duplicate_data) recommendations = self._generate_duplicate_recommendations(duplicate_data) return AnalysisResult( success=True, data=duplicate_data, errors=[], warnings=[], recommendations=recommendations, confidence=0.85 ) except Exception as e: logger.error(f"Duplicate detection failed: {e}") return AnalysisResult( success=False, data={}, errors=[str(e)], warnings=[], recommendations=[], confidence=0.0 ) def validate_microservices_architecture(self) -> AnalysisResult: """Validate microservices architecture patterns""" logger.info("Validating microservices architecture") try: microservices_data = { 'is_microservices': self._is_microservices_architecture(), 'services_detected': self._detect_services(), 'service_dependencies': self._analyze_service_dependencies(), 'database_independence': self._check_database_independence(), 'api_consistency': self._check_api_consistency(), 'configuration_management': self._analyze_config_management(), 'code_smells': self._detect_microservice_code_smells() } # Record patterns self.pattern_db.record_pattern('microservices_analysis', microservices_data) recommendations = self._generate_microservices_recommendations(microservices_data) return AnalysisResult( success=True, data=microservices_data, errors=[], warnings=[], recommendations=recommendations, confidence=0.8 ) except Exception as e: logger.error(f"Microservices validation failed: {e}") return AnalysisResult( success=False, data={}, errors=[str(e)], warnings=[], recommendations=[], confidence=0.0 ) def optimize_configurations(self) -> AnalysisResult: """Analyze and optimize configuration patterns""" logger.info("Optimizing configurations") try: config_data = { 'config_files': self._find_config_files(), 'environment_variables': self._analyze_env_variables(), 'duplicate_configs': self._find_duplicate_configs(), 'security_issues': self._check_config_security(), 'startup_scripts': self._analyze_startup_scripts() } # Record patterns self.pattern_db.record_pattern('configuration_optimization', config_data) recommendations = self._generate_config_recommendations(config_data) return AnalysisResult( success=True, data=config_data, errors=[], warnings=[], recommendations=recommendations, confidence=0.85 ) except Exception as e: logger.error(f"Configuration optimization failed: {e}") return AnalysisResult( success=False, data={}, errors=[str(e)], warnings=[], recommendations=[], confidence=0.0 ) def generate_improvement_report(self, analysis_results: Dict[str, AnalysisResult]) -> Dict[str, Any]: """Generate comprehensive improvement report""" logger.info("Generating improvement report") # Calculate overall metrics metrics = self._calculate_project_metrics(analysis_results) # Prioritize recommendations all_recommendations = [] for result in analysis_results.values(): all_recommendations.extend(result.recommendations) prioritized_recommendations = self._prioritize_recommendations(all_recommendations) # Generate action plan action_plan = self._generate_action_plan(prioritized_recommendations) report = { 'project_path': str(self.project_path), 'analysis_date': datetime.now().isoformat(), 'metrics': metrics.__dict__, 'analysis_results': {name: { 'success': result.success, 'confidence': result.confidence, 'errors': result.errors, 'warnings': result.warnings, 'recommendations_count': len(result.recommendations) } for name, result in analysis_results.items()}, 'prioritized_recommendations': prioritized_recommendations, 'action_plan': action_plan, 'health_score': self._calculate_health_score(metrics), 'improvement_potential': self._calculate_improvement_potential(metrics) } return report # Helper methods for analysis def _build_directory_tree(self) -> Dict: """Build directory tree structure""" tree = {} for root, dirs, files in os.walk(self.project_path): # Skip hidden directories and common ignore patterns dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ['node_modules', '__pycache__', 'venv']] relative_root = os.path.relpath(root, self.project_path) if relative_root == '.': relative_root = '' tree[relative_root] = { 'directories': dirs.copy(), 'files': files, 'file_count': len(files) } return tree def _analyze_file_distribution(self) -> Dict: """Analyze file type distribution""" distribution = {} total_files = 0 for root, _, files in os.walk(self.project_path): for file in files: ext = Path(file).suffix.lower() distribution[ext] = distribution.get(ext, 0) + 1 total_files += 1 return { 'by_extension': distribution, 'total_files': total_files, 'languages_detected': [self.supported_languages.get(ext, 'unknown') for ext in distribution.keys() if ext in self.supported_languages] } def _analyze_naming_conventions(self) -> Dict: """Analyze naming convention consistency""" conventions = { 'snake_case': 0, 'kebab_case': 0, 'camelCase': 0, 'PascalCase': 0, 'mixed': 0 } for root, dirs, files in os.walk(self.project_path): for name in dirs + files: name_no_ext = Path(name).stem if re.match(r'^[a-z]+(_[a-z]+)*$', name_no_ext): conventions['snake_case'] += 1 elif re.match(r'^[a-z]+(-[a-z]+)*$', name_no_ext): conventions['kebab_case'] += 1 elif re.match(r'^[a-z][a-zA-Z]*$', name_no_ext): conventions['camelCase'] += 1 elif re.match(r'^[A-Z][a-zA-Z]*$', name_no_ext): conventions['PascalCase'] += 1 else: conventions['mixed'] += 1 return conventions def _detect_architecture_type(self) -> str: """Detect project architecture type""" # Look for microservices indicators has_docker = any(Path(self.project_path).glob('**/Dockerfile')) has_docker_compose = any(Path(self.project_path).glob('**/docker-compose.yml')) has_multiple_services = len([d for d in os.listdir(self.project_path) if os.path.isdir(os.path.join(self.project_path, d)) and not d.startswith('.')]) > 3 if has_docker and has_docker_compose and has_multiple_services: return 'microservices' elif has_multiple_services: return 'multi-module' else: return 'monolith' def _calculate_organization_score(self, structure_data: Dict) -> float: """Calculate project organization score (0-1)""" score = 0.5 # Base score # Naming consistency bonus conventions = structure_data.get('naming_conventions', {}) total_names = sum(conventions.values()) if total_names > 0: dominant_convention = max(conventions.values()) consistency_ratio = dominant_convention / total_names score += consistency_ratio * 0.3 # Architecture clarity bonus if structure_data.get('architecture_type') in ['microservices', 'multi-module']: score += 0.2 return min(score, 1.0) def _find_function_duplicates(self, languages: List[str]) -> List[Dict]: """Find duplicate functions across the codebase""" functions = {} duplicates = [] for root, _, files in os.walk(self.project_path): for file in files: ext = Path(file).suffix.lower() if ext in self.supported_languages and self.supported_languages[ext] in languages: file_path = os.path.join(root, file) try: if ext == '.py': file_functions = self._extract_python_functions(file_path) elif ext in ['.js', '.ts']: file_functions = self._extract_js_functions(file_path) else: continue for func_name, func_content in file_functions.items(): func_hash = hashlib.md5(func_content.encode()).hexdigest() if func_hash in functions: # Found duplicate duplicates.append({ 'function_name': func_name, 'files': [functions[func_hash]['file'], file_path], 'content_hash': func_hash, 'similarity': 1.0 }) else: functions[func_hash] = { 'name': func_name, 'file': file_path, 'content': func_content } except Exception as e: logger.warning(f"Could not analyze {file_path}: {e}") return duplicates def _extract_python_functions(self, file_path: str) -> Dict[str, str]: """Extract Python function definitions""" functions = {} try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() tree = ast.parse(content) for node in ast.walk(tree): if isinstance(node, ast.FunctionDef): func_lines = content.split('\n')[node.lineno-1:node.end_lineno] func_content = '\n'.join(func_lines) functions[node.name] = func_content except Exception: pass return functions def _extract_js_functions(self, file_path: str) -> Dict[str, str]: """Extract JavaScript/TypeScript function definitions""" functions = {} try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Simple regex-based extraction for common function patterns patterns = [ r'function\s+(\w+)\s*\([^)]*\)\s*{[^}]*}', r'const\s+(\w+)\s*=\s*\([^)]*\)\s*=>\s*{[^}]*}', r'(\w+)\s*:\s*function\s*\([^)]*\)\s*{[^}]*}', ] for pattern in patterns: matches = re.finditer(pattern, content, re.MULTILINE | re.DOTALL) for match in matches: func_name = match.group(1) func_content = match.group(0) functions[func_name] = func_content except Exception: pass return functions def _find_code_block_duplicates(self, languages: List[str]) -> List[Dict]: """Find duplicate code blocks""" # Simplified implementation - could be enhanced with AST analysis duplicates = [] blocks = {} for root, _, files in os.walk(self.project_path): for file in files: ext = Path(file).suffix.lower() if ext in self.supported_languages and self.supported_languages[ext] in languages: file_path = os.path.join(root, file) try: with open(file_path, 'r', encoding='utf-8') as f: lines = f.readlines() # Look for blocks of 5+ similar lines for i in range(len(lines) - 4): block = ''.join(lines[i:i+5]).strip() if len(block) > 100: # Meaningful block block_hash = hashlib.md5(block.encode()).hexdigest() if block_hash in blocks: duplicates.append({ 'files': [blocks[block_hash]['file'], file_path], 'line_ranges': [blocks[block_hash]['lines'], f"{i+1}-{i+5}"], 'block_size': 5, 'content_hash': block_hash }) else: blocks[block_hash] = { 'file': file_path, 'lines': f"{i+1}-{i+5}", 'content': block } except Exception: pass return duplicates def _find_import_duplicates(self, languages: List[str]) -> List[Dict]: """Find duplicate import patterns""" imports = {} duplicates = [] for root, _, files in os.walk(self.project_path): for file in files: ext = Path(file).suffix.lower() if ext in self.supported_languages and self.supported_languages[ext] in languages: file_path = os.path.join(root, file) try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() file_imports = [] if ext == '.py': file_imports = re.findall(r'^(?:from\s+\S+\s+)?import\s+.+$', content, re.MULTILINE) elif ext in ['.js', '.ts']: file_imports = re.findall(r'^import\s+.+$', content, re.MULTILINE) for imp in file_imports: imp_clean = imp.strip() if imp_clean in imports: imports[imp_clean].append(file_path) else: imports[imp_clean] = [file_path] except Exception: pass # Find imports used in multiple files for imp, files in imports.items(): if len(files) > 1: duplicates.append({ 'import_statement': imp, 'files': files, 'usage_count': len(files) }) return duplicates def _find_config_duplicates(self) -> List[Dict]: """Find duplicate configuration patterns""" config_files = ['.env', '.env.local', '.env.example', 'config.json', 'package.json', 'requirements.txt'] configs = {} duplicates = [] for root, _, files in os.walk(self.project_path): for file in files: if file in config_files or file.endswith('.conf') or file.endswith('.config'): file_path = os.path.join(root, file) try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Look for similar configuration sections lines = [line.strip() for line in content.split('\n') if line.strip() and not line.startswith('#')] config_hash = hashlib.md5('\n'.join(sorted(lines)).encode()).hexdigest() if config_hash in configs: duplicates.append({ 'files': [configs[config_hash], file_path], 'similarity': 1.0, 'type': 'configuration' }) else: configs[config_hash] = file_path except Exception: pass return duplicates def _count_total_lines(self) -> int: """Count total lines of code""" total = 0 for root, _, files in os.walk(self.project_path): for file in files: ext = Path(file).suffix.lower() if ext in self.supported_languages: file_path = os.path.join(root, file) try: with open(file_path, 'r', encoding='utf-8') as f: total += len(f.readlines()) except Exception: pass return total def _count_duplicated_lines(self, duplicate_data: Dict) -> int: """Count total duplicated lines""" duplicated = 0 # Count from function duplicates duplicated += len(duplicate_data.get('function_duplicates', [])) * 10 # Estimate # Count from code block duplicates for block in duplicate_data.get('code_block_duplicates', []): duplicated += block.get('block_size', 5) return duplicated def _is_microservices_architecture(self) -> bool: """Check if project follows microservices architecture""" return self._detect_architecture_type() == 'microservices' def _detect_services(self) -> List[Dict]: """Detect individual services in the project""" services = [] for item in os.listdir(self.project_path): item_path = os.path.join(self.project_path, item) if os.path.isdir(item_path) and not item.startswith('.'): # Check if directory looks like a service has_main_file = any(os.path.exists(os.path.join(item_path, f)) for f in ['main.py', 'app.py', 'server.js', 'index.js', 'main.go']) has_config = any(os.path.exists(os.path.join(item_path, f)) for f in ['requirements.txt', 'package.json', 'go.mod']) if has_main_file or has_config: services.append({ 'name': item, 'path': item_path, 'has_main_file': has_main_file, 'has_config': has_config, 'estimated_language': self._detect_service_language(item_path) }) return services def _detect_service_language(self, service_path: str) -> str: """Detect primary language of a service""" file_counts = {} for root, _, files in os.walk(service_path): for file in files: ext = Path(file).suffix.lower() if ext in self.supported_languages: lang = self.supported_languages[ext] file_counts[lang] = file_counts.get(lang, 0) + 1 return max(file_counts.items(), key=lambda x: x[1])[0] if file_counts else 'unknown' def _analyze_service_dependencies(self) -> Dict: """Analyze dependencies between services""" # Simplified implementation return { 'internal_dependencies': [], 'external_dependencies': [], 'circular_dependencies': [], 'dependency_graph': {} } def _check_database_independence(self) -> Dict: """Check if services have independent databases""" # Look for database configuration patterns return { 'independent_databases': True, 'shared_database_detected': False, 'database_configs': [] } def _check_api_consistency(self) -> Dict: """Check API consistency across services""" # Look for API endpoint patterns return { 'consistent_error_handling': True, 'consistent_response_format': True, 'api_versioning': False, 'endpoint_patterns': [] } def _analyze_config_management(self) -> Dict: """Analyze configuration management patterns""" config_files = [] for root, _, files in os.walk(self.project_path): for file in files: if file.startswith('.env') or 'config' in file.lower(): config_files.append(os.path.join(root, file)) return { 'config_files': config_files, 'centralized_config': len(config_files) <= 3, 'environment_separation': any('.env.local' in f or '.env.dev' in f for f in config_files) } def _detect_microservice_code_smells(self) -> List[Dict]: """Detect microservice-specific code smells""" code_smells = [] # Example code smells for microservices services = self._detect_services() if len(services) > 1: # Check for shared database smell # Check for chatty communication smell # Check for distributed monolith smell pass return code_smells def _find_config_files(self) -> List[str]: """Find all configuration files""" config_patterns = ['*.env*', '*.conf', '*.config', '*.json', '*.yml', '*.yaml', '*.ini'] config_files = [] for pattern in config_patterns: config_files.extend(list(self.project_path.glob(f'**/{pattern}'))) return [str(f) for f in config_files] def _analyze_env_variables(self) -> Dict: """Analyze environment variable usage""" env_vars = set() env_files = [] for root, _, files in os.walk(self.project_path): for file in files: if file.startswith('.env'): file_path = os.path.join(root, file) env_files.append(file_path) try: with open(file_path, 'r') as f: for line in f: if '=' in line and not line.strip().startswith('#'): var_name = line.split('=')[0].strip() env_vars.add(var_name) except Exception: pass return { 'env_files': env_files, 'total_variables': len(env_vars), 'variables': list(env_vars) } def _find_duplicate_configs(self) -> List[Dict]: """Find duplicate configuration entries""" # Implementation would compare configuration files return [] def _check_config_security(self) -> List[Dict]: """Check for security issues in configuration""" security_issues = [] for root, _, files in os.walk(self.project_path): for file in files: if file.startswith('.env') and not file.endswith('.example'): file_path = os.path.join(root, file) try: with open(file_path, 'r') as f: content = f.read() if 'password' in content.lower() or 'secret' in content.lower(): security_issues.append({ 'file': file_path, 'issue': 'Potential secrets in configuration file', 'severity': 'high' }) except Exception: pass return security_issues def _analyze_startup_scripts(self) -> Dict: """Analyze startup scripts and procedures""" startup_files = [] for file in ['start.sh', 'start-local.sh', 'run.sh', 'docker-compose.yml', 'Dockerfile']: file_path = self.project_path / file if file_path.exists(): startup_files.append(str(file_path)) return { 'startup_files': startup_files, 'has_docker': any('docker' in f.lower() for f in startup_files), 'has_startup_script': any('.sh' in f for f in startup_files) } def _calculate_project_metrics(self, analysis_results: Dict[str, AnalysisResult]) -> ProjectMetrics: """Calculate overall project metrics""" total_files = 0 lines_of_code = self._count_total_lines() duplicated_lines = 0 # Get data from analysis results structure_result = analysis_results.get('structure') duplicates_result = analysis_results.get('duplicates') if structure_result and structure_result.success: file_dist = structure_result.data.get('file_distribution', {}) total_files = file_dist.get('total_files', 0) if duplicates_result and duplicates_result.success: duplicated_lines = duplicates_result.data.get('total_duplicated_lines', 0) duplication_percentage = (duplicated_lines / lines_of_code * 100) if lines_of_code > 0 else 0 # Calculate scores (simplified) complexity_score = 0.7 # Would need more sophisticated analysis security_score = 0.8 organization_score = structure_result.data.get('organization_score', 0.5) if structure_result else 0.5 return ProjectMetrics( total_files=total_files, lines_of_code=lines_of_code, duplicated_lines=duplicated_lines, complexity_score=complexity_score, security_score=security_score, organization_score=organization_score, duplication_percentage=duplication_percentage ) def _prioritize_recommendations(self, recommendations: List[str]) -> List[Dict]: """Prioritize recommendations by impact and effort""" prioritized = [] for rec in recommendations: # Simple prioritization based on keywords priority = 'medium' impact = 'medium' effort = 'medium' if any(word in rec.lower() for word in ['security', 'vulnerability', 'secret']): priority = 'high' impact = 'high' elif any(word in rec.lower() for word in ['duplicate', 'redundant']): priority = 'high' impact = 'medium' effort = 'low' elif any(word in rec.lower() for word in ['structure', 'organization']): priority = 'medium' impact = 'high' effort = 'high' prioritized.append({ 'recommendation': rec, 'priority': priority, 'impact': impact, 'effort': effort }) # Sort by priority (high -> medium -> low) priority_order = {'high': 3, 'medium': 2, 'low': 1} prioritized.sort(key=lambda x: priority_order.get(x['priority'], 0), reverse=True) return prioritized def _generate_action_plan(self, prioritized_recommendations: List[Dict]) -> List[Dict]: """Generate step-by-step action plan""" action_plan = [] # Group by priority and create phases high_priority = [r for r in prioritized_recommendations if r['priority'] == 'high'] medium_priority = [r for r in prioritized_recommendations if r['priority'] == 'medium'] low_priority = [r for r in prioritized_recommendations if r['priority'] == 'low'] if high_priority: action_plan.append({ 'phase': 'Critical Issues', 'description': 'Address high-priority issues first', 'items': high_priority[:5], # Limit to top 5 'estimated_time': '1-2 days' }) if medium_priority: action_plan.append({ 'phase': 'Improvements', 'description': 'Implement medium-priority improvements', 'items': medium_priority[:7], 'estimated_time': '3-5 days' }) if low_priority: action_plan.append({ 'phase': 'Optimizations', 'description': 'Apply low-priority optimizations', 'items': low_priority, 'estimated_time': '1-2 days' }) return action_plan def _calculate_health_score(self, metrics: ProjectMetrics) -> float: """Calculate overall project health score (0-100)""" score = 0 # Organization score (25%) score += metrics.organization_score * 25 # Duplication penalty (25%) duplication_score = max(0, 1 - (metrics.duplication_percentage / 20)) # 20% duplication = 0 score score += duplication_score * 25 # Security score (25%) score += metrics.security_score * 25 # Complexity score (25%) score += (1 - metrics.complexity_score) * 25 # Lower complexity is better return round(score, 1) def _calculate_improvement_potential(self, metrics: ProjectMetrics) -> Dict: """Calculate improvement potential in different areas""" return { 'duplication_reduction': f"{metrics.duplication_percentage:.1f}% code can be deduplicated", 'organization_improvement': f"{(1-metrics.organization_score)*100:.1f}% organization improvement possible", 'security_enhancement': f"{(1-metrics.security_score)*100:.1f}% security improvement potential", 'complexity_reduction': f"{metrics.complexity_score*100:.1f}% complexity can be reduced" } # Recommendation generators def _generate_structure_recommendations(self, structure_data: Dict) -> List[str]: """Generate structure-related recommendations""" recommendations = [] if structure_data.get('organization_score', 0) < 0.7: recommendations.append("Improve project organization and directory structure") naming_conventions = structure_data.get('naming_conventions', {}) if naming_conventions.get('mixed', 0) > 0: recommendations.append("Standardize naming conventions across files and directories") if structure_data.get('architecture_type') == 'microservices': recommendations.append("Validate microservices architecture best practices") return recommendations def _generate_duplicate_recommendations(self, duplicate_data: Dict) -> List[str]: """Generate duplication-related recommendations""" recommendations = [] if duplicate_data.get('duplication_percentage', 0) > 10: recommendations.append("Significant code duplication detected - consider refactoring into shared modules") function_dups = duplicate_data.get('function_duplicates', []) if function_dups: recommendations.append(f"Found {len(function_dups)} duplicate functions - consolidate into utility modules") import_dups = duplicate_data.get('import_duplicates', []) if import_dups: recommendations.append("Consolidate repeated import patterns") return recommendations def _generate_microservices_recommendations(self, microservices_data: Dict) -> List[str]: """Generate microservices-related recommendations""" recommendations = [] if not microservices_data.get('database_independence', {}).get('independent_databases', True): recommendations.append("Ensure each service has its own database for true independence") if not microservices_data.get('api_consistency', {}).get('consistent_error_handling', True): recommendations.append("Standardize error handling patterns across all services") code_smells = microservices_data.get('code_smells', []) if code_smells: recommendations.append("Address microservice-specific code smells") return recommendations def _generate_config_recommendations(self, config_data: Dict) -> List[str]: """Generate configuration-related recommendations""" recommendations = [] security_issues = config_data.get('security_issues', []) if security_issues: recommendations.append("Address security issues in configuration files") if not config_data.get('environment_variables', {}).get('centralized_config', True): recommendations.append("Centralize configuration management") duplicate_configs = config_data.get('duplicate_configs', []) if duplicate_configs: recommendations.append("Remove duplicate configuration entries") return recommendations # Main function for testing def main(): """Main function for testing the analyzer""" import sys if len(sys.argv) != 2: print("Usage: python codebase_optimizer_engine.py <project_path>") sys.exit(1) project_path = sys.argv[1] if not os.path.exists(project_path): print(f"Error: Project path '{project_path}' does not exist") sys.exit(1) analyzer = CodebaseAnalyzer(project_path) # Run analysis print("Starting codebase analysis...") structure_result = analyzer.analyze_project_structure() print(f"Structure analysis: {'✓' if structure_result.success else '✗'}") duplicates_result = analyzer.detect_code_duplicates() print(f"Duplicate detection: {'✓' if duplicates_result.success else '✗'}") microservices_result = analyzer.validate_microservices_architecture() print(f"Microservices validation: {'✓' if microservices_result.success else '✗'}") config_result = analyzer.optimize_configurations() print(f"Configuration analysis: {'✓' if config_result.success else '✗'}") # Generate report results = { 'structure': structure_result, 'duplicates': duplicates_result, 'microservices': microservices_result, 'configuration': config_result } report = analyzer.generate_improvement_report(results) print("\n" + "="*50) print("CODEBASE ANALYSIS REPORT") print("="*50) print(f"Project: {report['project_path']}") print(f"Health Score: {report['health_score']}/100") print(f"Analysis Date: {report['analysis_date']}") print(f"\nProject Metrics:") metrics = report['metrics'] print(f" Total Files: {metrics['total_files']}") print(f" Lines of Code: {metrics['lines_of_code']}") print(f" Duplication: {metrics['duplication_percentage']:.1f}%") print(f" Organization Score: {metrics['organization_score']:.1f}") print(f"\nTop Recommendations:") for i, rec in enumerate(report['prioritized_recommendations'][:5], 1): print(f" {i}. [{rec['priority'].upper()}] {rec['recommendation']}") print(f"\nAction Plan:") for phase in report['action_plan']: print(f" Phase: {phase['phase']} ({phase['estimated_time']})") print(f" {phase['description']}") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/liadgez/codebase-optimizer-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server