#!/usr/bin/env python3
"""
CodeBase Optimizer Engine
=========================
A sophisticated analysis engine for project validation, duplicate detection,
and optimization recommendations. Designed to learn and improve with each use.
Author: AI Assistant
License: MIT
"""
import os
import json
import sqlite3
import hashlib
import re
import ast
import subprocess
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass
from datetime import datetime
import logging
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class AnalysisResult:
"""Structured result for analysis operations"""
success: bool
data: Dict[str, Any]
errors: List[str]
warnings: List[str]
recommendations: List[str]
confidence: float
@dataclass
class ProjectMetrics:
"""Project health metrics"""
total_files: int
lines_of_code: int
duplicated_lines: int
complexity_score: float
security_score: float
organization_score: float
duplication_percentage: float
class PatternDatabase:
"""SQLite database for learning and storing patterns"""
def __init__(self, db_path: str = "codebase_patterns.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Initialize the pattern database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Projects table
cursor.execute('''
CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE,
path TEXT,
type TEXT,
last_analyzed TIMESTAMP,
metrics TEXT
)
''')
# Patterns table
cursor.execute('''
CREATE TABLE IF NOT EXISTS patterns (
id INTEGER PRIMARY KEY,
pattern_type TEXT,
pattern_hash TEXT UNIQUE,
pattern_data TEXT,
frequency INTEGER DEFAULT 1,
effectiveness_score REAL DEFAULT 0.5,
first_seen TIMESTAMP,
last_seen TIMESTAMP
)
''')
# Improvements table
cursor.execute('''
CREATE TABLE IF NOT EXISTS improvements (
id INTEGER PRIMARY KEY,
project_id INTEGER,
improvement_type TEXT,
before_metrics TEXT,
after_metrics TEXT,
success_score REAL,
applied_date TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects (id)
)
''')
conn.commit()
conn.close()
def record_pattern(self, pattern_type: str, pattern_data: Dict) -> None:
"""Record a detected pattern"""
pattern_hash = hashlib.md5(str(pattern_data).encode()).hexdigest()
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO patterns
(pattern_type, pattern_hash, pattern_data, frequency, last_seen)
VALUES (?, ?, ?,
COALESCE((SELECT frequency FROM patterns WHERE pattern_hash = ?), 0) + 1,
?)
''', (pattern_type, pattern_hash, json.dumps(pattern_data), pattern_hash, datetime.now()))
conn.commit()
conn.close()
def get_learned_patterns(self, pattern_type: str = None) -> List[Dict]:
"""Get learned patterns, optionally filtered by type"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
if pattern_type:
cursor.execute('SELECT * FROM patterns WHERE pattern_type = ? ORDER BY frequency DESC', (pattern_type,))
else:
cursor.execute('SELECT * FROM patterns ORDER BY frequency DESC')
patterns = cursor.fetchall()
conn.close()
return [dict(zip([col[0] for col in cursor.description], row)) for row in patterns]
class CodebaseAnalyzer:
"""Main analysis engine for codebase optimization"""
def __init__(self, project_path: str):
self.project_path = Path(project_path).resolve()
self.pattern_db = PatternDatabase()
self.supported_languages = {
'.py': 'python',
'.js': 'javascript',
'.ts': 'typescript',
'.java': 'java',
'.go': 'go',
'.rs': 'rust',
'.cpp': 'cpp',
'.c': 'c',
'.php': 'php',
'.rb': 'ruby'
}
self.results = {}
def analyze_project_structure(self) -> AnalysisResult:
"""Analyze overall project structure and organization"""
logger.info(f"Analyzing project structure: {self.project_path}")
try:
structure_data = {
'root_path': str(self.project_path),
'directory_tree': self._build_directory_tree(),
'file_distribution': self._analyze_file_distribution(),
'naming_conventions': self._analyze_naming_conventions(),
'architecture_type': self._detect_architecture_type(),
'organization_score': 0.0
}
# Calculate organization score
structure_data['organization_score'] = self._calculate_organization_score(structure_data)
# Record patterns
self.pattern_db.record_pattern('project_structure', structure_data)
recommendations = self._generate_structure_recommendations(structure_data)
return AnalysisResult(
success=True,
data=structure_data,
errors=[],
warnings=[],
recommendations=recommendations,
confidence=0.9
)
except Exception as e:
logger.error(f"Structure analysis failed: {e}")
return AnalysisResult(
success=False,
data={},
errors=[str(e)],
warnings=[],
recommendations=[],
confidence=0.0
)
def detect_code_duplicates(self, languages: List[str] = None) -> AnalysisResult:
"""Detect code duplications across the project"""
logger.info("Detecting code duplicates")
try:
if languages is None:
languages = list(self.supported_languages.values())
duplicate_data = {
'function_duplicates': self._find_function_duplicates(languages),
'code_block_duplicates': self._find_code_block_duplicates(languages),
'import_duplicates': self._find_import_duplicates(languages),
'configuration_duplicates': self._find_config_duplicates(),
'total_duplicated_lines': 0,
'duplication_percentage': 0.0
}
# Calculate duplication metrics
total_lines = self._count_total_lines()
duplicated_lines = self._count_duplicated_lines(duplicate_data)
duplicate_data['total_duplicated_lines'] = duplicated_lines
duplicate_data['duplication_percentage'] = (duplicated_lines / total_lines * 100) if total_lines > 0 else 0
# Record patterns
self.pattern_db.record_pattern('code_duplicates', duplicate_data)
recommendations = self._generate_duplicate_recommendations(duplicate_data)
return AnalysisResult(
success=True,
data=duplicate_data,
errors=[],
warnings=[],
recommendations=recommendations,
confidence=0.85
)
except Exception as e:
logger.error(f"Duplicate detection failed: {e}")
return AnalysisResult(
success=False,
data={},
errors=[str(e)],
warnings=[],
recommendations=[],
confidence=0.0
)
def validate_microservices_architecture(self) -> AnalysisResult:
"""Validate microservices architecture patterns"""
logger.info("Validating microservices architecture")
try:
microservices_data = {
'is_microservices': self._is_microservices_architecture(),
'services_detected': self._detect_services(),
'service_dependencies': self._analyze_service_dependencies(),
'database_independence': self._check_database_independence(),
'api_consistency': self._check_api_consistency(),
'configuration_management': self._analyze_config_management(),
'code_smells': self._detect_microservice_code_smells()
}
# Record patterns
self.pattern_db.record_pattern('microservices_analysis', microservices_data)
recommendations = self._generate_microservices_recommendations(microservices_data)
return AnalysisResult(
success=True,
data=microservices_data,
errors=[],
warnings=[],
recommendations=recommendations,
confidence=0.8
)
except Exception as e:
logger.error(f"Microservices validation failed: {e}")
return AnalysisResult(
success=False,
data={},
errors=[str(e)],
warnings=[],
recommendations=[],
confidence=0.0
)
def optimize_configurations(self) -> AnalysisResult:
"""Analyze and optimize configuration patterns"""
logger.info("Optimizing configurations")
try:
config_data = {
'config_files': self._find_config_files(),
'environment_variables': self._analyze_env_variables(),
'duplicate_configs': self._find_duplicate_configs(),
'security_issues': self._check_config_security(),
'startup_scripts': self._analyze_startup_scripts()
}
# Record patterns
self.pattern_db.record_pattern('configuration_optimization', config_data)
recommendations = self._generate_config_recommendations(config_data)
return AnalysisResult(
success=True,
data=config_data,
errors=[],
warnings=[],
recommendations=recommendations,
confidence=0.85
)
except Exception as e:
logger.error(f"Configuration optimization failed: {e}")
return AnalysisResult(
success=False,
data={},
errors=[str(e)],
warnings=[],
recommendations=[],
confidence=0.0
)
def generate_improvement_report(self, analysis_results: Dict[str, AnalysisResult]) -> Dict[str, Any]:
"""Generate comprehensive improvement report"""
logger.info("Generating improvement report")
# Calculate overall metrics
metrics = self._calculate_project_metrics(analysis_results)
# Prioritize recommendations
all_recommendations = []
for result in analysis_results.values():
all_recommendations.extend(result.recommendations)
prioritized_recommendations = self._prioritize_recommendations(all_recommendations)
# Generate action plan
action_plan = self._generate_action_plan(prioritized_recommendations)
report = {
'project_path': str(self.project_path),
'analysis_date': datetime.now().isoformat(),
'metrics': metrics.__dict__,
'analysis_results': {name: {
'success': result.success,
'confidence': result.confidence,
'errors': result.errors,
'warnings': result.warnings,
'recommendations_count': len(result.recommendations)
} for name, result in analysis_results.items()},
'prioritized_recommendations': prioritized_recommendations,
'action_plan': action_plan,
'health_score': self._calculate_health_score(metrics),
'improvement_potential': self._calculate_improvement_potential(metrics)
}
return report
# Helper methods for analysis
def _build_directory_tree(self) -> Dict:
"""Build directory tree structure"""
tree = {}
for root, dirs, files in os.walk(self.project_path):
# Skip hidden directories and common ignore patterns
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ['node_modules', '__pycache__', 'venv']]
relative_root = os.path.relpath(root, self.project_path)
if relative_root == '.':
relative_root = ''
tree[relative_root] = {
'directories': dirs.copy(),
'files': files,
'file_count': len(files)
}
return tree
def _analyze_file_distribution(self) -> Dict:
"""Analyze file type distribution"""
distribution = {}
total_files = 0
for root, _, files in os.walk(self.project_path):
for file in files:
ext = Path(file).suffix.lower()
distribution[ext] = distribution.get(ext, 0) + 1
total_files += 1
return {
'by_extension': distribution,
'total_files': total_files,
'languages_detected': [self.supported_languages.get(ext, 'unknown')
for ext in distribution.keys()
if ext in self.supported_languages]
}
def _analyze_naming_conventions(self) -> Dict:
"""Analyze naming convention consistency"""
conventions = {
'snake_case': 0,
'kebab_case': 0,
'camelCase': 0,
'PascalCase': 0,
'mixed': 0
}
for root, dirs, files in os.walk(self.project_path):
for name in dirs + files:
name_no_ext = Path(name).stem
if re.match(r'^[a-z]+(_[a-z]+)*$', name_no_ext):
conventions['snake_case'] += 1
elif re.match(r'^[a-z]+(-[a-z]+)*$', name_no_ext):
conventions['kebab_case'] += 1
elif re.match(r'^[a-z][a-zA-Z]*$', name_no_ext):
conventions['camelCase'] += 1
elif re.match(r'^[A-Z][a-zA-Z]*$', name_no_ext):
conventions['PascalCase'] += 1
else:
conventions['mixed'] += 1
return conventions
def _detect_architecture_type(self) -> str:
"""Detect project architecture type"""
# Look for microservices indicators
has_docker = any(Path(self.project_path).glob('**/Dockerfile'))
has_docker_compose = any(Path(self.project_path).glob('**/docker-compose.yml'))
has_multiple_services = len([d for d in os.listdir(self.project_path)
if os.path.isdir(os.path.join(self.project_path, d))
and not d.startswith('.')]) > 3
if has_docker and has_docker_compose and has_multiple_services:
return 'microservices'
elif has_multiple_services:
return 'multi-module'
else:
return 'monolith'
def _calculate_organization_score(self, structure_data: Dict) -> float:
"""Calculate project organization score (0-1)"""
score = 0.5 # Base score
# Naming consistency bonus
conventions = structure_data.get('naming_conventions', {})
total_names = sum(conventions.values())
if total_names > 0:
dominant_convention = max(conventions.values())
consistency_ratio = dominant_convention / total_names
score += consistency_ratio * 0.3
# Architecture clarity bonus
if structure_data.get('architecture_type') in ['microservices', 'multi-module']:
score += 0.2
return min(score, 1.0)
def _find_function_duplicates(self, languages: List[str]) -> List[Dict]:
"""Find duplicate functions across the codebase"""
functions = {}
duplicates = []
for root, _, files in os.walk(self.project_path):
for file in files:
ext = Path(file).suffix.lower()
if ext in self.supported_languages and self.supported_languages[ext] in languages:
file_path = os.path.join(root, file)
try:
if ext == '.py':
file_functions = self._extract_python_functions(file_path)
elif ext in ['.js', '.ts']:
file_functions = self._extract_js_functions(file_path)
else:
continue
for func_name, func_content in file_functions.items():
func_hash = hashlib.md5(func_content.encode()).hexdigest()
if func_hash in functions:
# Found duplicate
duplicates.append({
'function_name': func_name,
'files': [functions[func_hash]['file'], file_path],
'content_hash': func_hash,
'similarity': 1.0
})
else:
functions[func_hash] = {
'name': func_name,
'file': file_path,
'content': func_content
}
except Exception as e:
logger.warning(f"Could not analyze {file_path}: {e}")
return duplicates
def _extract_python_functions(self, file_path: str) -> Dict[str, str]:
"""Extract Python function definitions"""
functions = {}
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
func_lines = content.split('\n')[node.lineno-1:node.end_lineno]
func_content = '\n'.join(func_lines)
functions[node.name] = func_content
except Exception:
pass
return functions
def _extract_js_functions(self, file_path: str) -> Dict[str, str]:
"""Extract JavaScript/TypeScript function definitions"""
functions = {}
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Simple regex-based extraction for common function patterns
patterns = [
r'function\s+(\w+)\s*\([^)]*\)\s*{[^}]*}',
r'const\s+(\w+)\s*=\s*\([^)]*\)\s*=>\s*{[^}]*}',
r'(\w+)\s*:\s*function\s*\([^)]*\)\s*{[^}]*}',
]
for pattern in patterns:
matches = re.finditer(pattern, content, re.MULTILINE | re.DOTALL)
for match in matches:
func_name = match.group(1)
func_content = match.group(0)
functions[func_name] = func_content
except Exception:
pass
return functions
def _find_code_block_duplicates(self, languages: List[str]) -> List[Dict]:
"""Find duplicate code blocks"""
# Simplified implementation - could be enhanced with AST analysis
duplicates = []
blocks = {}
for root, _, files in os.walk(self.project_path):
for file in files:
ext = Path(file).suffix.lower()
if ext in self.supported_languages and self.supported_languages[ext] in languages:
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# Look for blocks of 5+ similar lines
for i in range(len(lines) - 4):
block = ''.join(lines[i:i+5]).strip()
if len(block) > 100: # Meaningful block
block_hash = hashlib.md5(block.encode()).hexdigest()
if block_hash in blocks:
duplicates.append({
'files': [blocks[block_hash]['file'], file_path],
'line_ranges': [blocks[block_hash]['lines'], f"{i+1}-{i+5}"],
'block_size': 5,
'content_hash': block_hash
})
else:
blocks[block_hash] = {
'file': file_path,
'lines': f"{i+1}-{i+5}",
'content': block
}
except Exception:
pass
return duplicates
def _find_import_duplicates(self, languages: List[str]) -> List[Dict]:
"""Find duplicate import patterns"""
imports = {}
duplicates = []
for root, _, files in os.walk(self.project_path):
for file in files:
ext = Path(file).suffix.lower()
if ext in self.supported_languages and self.supported_languages[ext] in languages:
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
file_imports = []
if ext == '.py':
file_imports = re.findall(r'^(?:from\s+\S+\s+)?import\s+.+$', content, re.MULTILINE)
elif ext in ['.js', '.ts']:
file_imports = re.findall(r'^import\s+.+$', content, re.MULTILINE)
for imp in file_imports:
imp_clean = imp.strip()
if imp_clean in imports:
imports[imp_clean].append(file_path)
else:
imports[imp_clean] = [file_path]
except Exception:
pass
# Find imports used in multiple files
for imp, files in imports.items():
if len(files) > 1:
duplicates.append({
'import_statement': imp,
'files': files,
'usage_count': len(files)
})
return duplicates
def _find_config_duplicates(self) -> List[Dict]:
"""Find duplicate configuration patterns"""
config_files = ['.env', '.env.local', '.env.example', 'config.json', 'package.json', 'requirements.txt']
configs = {}
duplicates = []
for root, _, files in os.walk(self.project_path):
for file in files:
if file in config_files or file.endswith('.conf') or file.endswith('.config'):
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Look for similar configuration sections
lines = [line.strip() for line in content.split('\n') if line.strip() and not line.startswith('#')]
config_hash = hashlib.md5('\n'.join(sorted(lines)).encode()).hexdigest()
if config_hash in configs:
duplicates.append({
'files': [configs[config_hash], file_path],
'similarity': 1.0,
'type': 'configuration'
})
else:
configs[config_hash] = file_path
except Exception:
pass
return duplicates
def _count_total_lines(self) -> int:
"""Count total lines of code"""
total = 0
for root, _, files in os.walk(self.project_path):
for file in files:
ext = Path(file).suffix.lower()
if ext in self.supported_languages:
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8') as f:
total += len(f.readlines())
except Exception:
pass
return total
def _count_duplicated_lines(self, duplicate_data: Dict) -> int:
"""Count total duplicated lines"""
duplicated = 0
# Count from function duplicates
duplicated += len(duplicate_data.get('function_duplicates', [])) * 10 # Estimate
# Count from code block duplicates
for block in duplicate_data.get('code_block_duplicates', []):
duplicated += block.get('block_size', 5)
return duplicated
def _is_microservices_architecture(self) -> bool:
"""Check if project follows microservices architecture"""
return self._detect_architecture_type() == 'microservices'
def _detect_services(self) -> List[Dict]:
"""Detect individual services in the project"""
services = []
for item in os.listdir(self.project_path):
item_path = os.path.join(self.project_path, item)
if os.path.isdir(item_path) and not item.startswith('.'):
# Check if directory looks like a service
has_main_file = any(os.path.exists(os.path.join(item_path, f))
for f in ['main.py', 'app.py', 'server.js', 'index.js', 'main.go'])
has_config = any(os.path.exists(os.path.join(item_path, f))
for f in ['requirements.txt', 'package.json', 'go.mod'])
if has_main_file or has_config:
services.append({
'name': item,
'path': item_path,
'has_main_file': has_main_file,
'has_config': has_config,
'estimated_language': self._detect_service_language(item_path)
})
return services
def _detect_service_language(self, service_path: str) -> str:
"""Detect primary language of a service"""
file_counts = {}
for root, _, files in os.walk(service_path):
for file in files:
ext = Path(file).suffix.lower()
if ext in self.supported_languages:
lang = self.supported_languages[ext]
file_counts[lang] = file_counts.get(lang, 0) + 1
return max(file_counts.items(), key=lambda x: x[1])[0] if file_counts else 'unknown'
def _analyze_service_dependencies(self) -> Dict:
"""Analyze dependencies between services"""
# Simplified implementation
return {
'internal_dependencies': [],
'external_dependencies': [],
'circular_dependencies': [],
'dependency_graph': {}
}
def _check_database_independence(self) -> Dict:
"""Check if services have independent databases"""
# Look for database configuration patterns
return {
'independent_databases': True,
'shared_database_detected': False,
'database_configs': []
}
def _check_api_consistency(self) -> Dict:
"""Check API consistency across services"""
# Look for API endpoint patterns
return {
'consistent_error_handling': True,
'consistent_response_format': True,
'api_versioning': False,
'endpoint_patterns': []
}
def _analyze_config_management(self) -> Dict:
"""Analyze configuration management patterns"""
config_files = []
for root, _, files in os.walk(self.project_path):
for file in files:
if file.startswith('.env') or 'config' in file.lower():
config_files.append(os.path.join(root, file))
return {
'config_files': config_files,
'centralized_config': len(config_files) <= 3,
'environment_separation': any('.env.local' in f or '.env.dev' in f for f in config_files)
}
def _detect_microservice_code_smells(self) -> List[Dict]:
"""Detect microservice-specific code smells"""
code_smells = []
# Example code smells for microservices
services = self._detect_services()
if len(services) > 1:
# Check for shared database smell
# Check for chatty communication smell
# Check for distributed monolith smell
pass
return code_smells
def _find_config_files(self) -> List[str]:
"""Find all configuration files"""
config_patterns = ['*.env*', '*.conf', '*.config', '*.json', '*.yml', '*.yaml', '*.ini']
config_files = []
for pattern in config_patterns:
config_files.extend(list(self.project_path.glob(f'**/{pattern}')))
return [str(f) for f in config_files]
def _analyze_env_variables(self) -> Dict:
"""Analyze environment variable usage"""
env_vars = set()
env_files = []
for root, _, files in os.walk(self.project_path):
for file in files:
if file.startswith('.env'):
file_path = os.path.join(root, file)
env_files.append(file_path)
try:
with open(file_path, 'r') as f:
for line in f:
if '=' in line and not line.strip().startswith('#'):
var_name = line.split('=')[0].strip()
env_vars.add(var_name)
except Exception:
pass
return {
'env_files': env_files,
'total_variables': len(env_vars),
'variables': list(env_vars)
}
def _find_duplicate_configs(self) -> List[Dict]:
"""Find duplicate configuration entries"""
# Implementation would compare configuration files
return []
def _check_config_security(self) -> List[Dict]:
"""Check for security issues in configuration"""
security_issues = []
for root, _, files in os.walk(self.project_path):
for file in files:
if file.startswith('.env') and not file.endswith('.example'):
file_path = os.path.join(root, file)
try:
with open(file_path, 'r') as f:
content = f.read()
if 'password' in content.lower() or 'secret' in content.lower():
security_issues.append({
'file': file_path,
'issue': 'Potential secrets in configuration file',
'severity': 'high'
})
except Exception:
pass
return security_issues
def _analyze_startup_scripts(self) -> Dict:
"""Analyze startup scripts and procedures"""
startup_files = []
for file in ['start.sh', 'start-local.sh', 'run.sh', 'docker-compose.yml', 'Dockerfile']:
file_path = self.project_path / file
if file_path.exists():
startup_files.append(str(file_path))
return {
'startup_files': startup_files,
'has_docker': any('docker' in f.lower() for f in startup_files),
'has_startup_script': any('.sh' in f for f in startup_files)
}
def _calculate_project_metrics(self, analysis_results: Dict[str, AnalysisResult]) -> ProjectMetrics:
"""Calculate overall project metrics"""
total_files = 0
lines_of_code = self._count_total_lines()
duplicated_lines = 0
# Get data from analysis results
structure_result = analysis_results.get('structure')
duplicates_result = analysis_results.get('duplicates')
if structure_result and structure_result.success:
file_dist = structure_result.data.get('file_distribution', {})
total_files = file_dist.get('total_files', 0)
if duplicates_result and duplicates_result.success:
duplicated_lines = duplicates_result.data.get('total_duplicated_lines', 0)
duplication_percentage = (duplicated_lines / lines_of_code * 100) if lines_of_code > 0 else 0
# Calculate scores (simplified)
complexity_score = 0.7 # Would need more sophisticated analysis
security_score = 0.8
organization_score = structure_result.data.get('organization_score', 0.5) if structure_result else 0.5
return ProjectMetrics(
total_files=total_files,
lines_of_code=lines_of_code,
duplicated_lines=duplicated_lines,
complexity_score=complexity_score,
security_score=security_score,
organization_score=organization_score,
duplication_percentage=duplication_percentage
)
def _prioritize_recommendations(self, recommendations: List[str]) -> List[Dict]:
"""Prioritize recommendations by impact and effort"""
prioritized = []
for rec in recommendations:
# Simple prioritization based on keywords
priority = 'medium'
impact = 'medium'
effort = 'medium'
if any(word in rec.lower() for word in ['security', 'vulnerability', 'secret']):
priority = 'high'
impact = 'high'
elif any(word in rec.lower() for word in ['duplicate', 'redundant']):
priority = 'high'
impact = 'medium'
effort = 'low'
elif any(word in rec.lower() for word in ['structure', 'organization']):
priority = 'medium'
impact = 'high'
effort = 'high'
prioritized.append({
'recommendation': rec,
'priority': priority,
'impact': impact,
'effort': effort
})
# Sort by priority (high -> medium -> low)
priority_order = {'high': 3, 'medium': 2, 'low': 1}
prioritized.sort(key=lambda x: priority_order.get(x['priority'], 0), reverse=True)
return prioritized
def _generate_action_plan(self, prioritized_recommendations: List[Dict]) -> List[Dict]:
"""Generate step-by-step action plan"""
action_plan = []
# Group by priority and create phases
high_priority = [r for r in prioritized_recommendations if r['priority'] == 'high']
medium_priority = [r for r in prioritized_recommendations if r['priority'] == 'medium']
low_priority = [r for r in prioritized_recommendations if r['priority'] == 'low']
if high_priority:
action_plan.append({
'phase': 'Critical Issues',
'description': 'Address high-priority issues first',
'items': high_priority[:5], # Limit to top 5
'estimated_time': '1-2 days'
})
if medium_priority:
action_plan.append({
'phase': 'Improvements',
'description': 'Implement medium-priority improvements',
'items': medium_priority[:7],
'estimated_time': '3-5 days'
})
if low_priority:
action_plan.append({
'phase': 'Optimizations',
'description': 'Apply low-priority optimizations',
'items': low_priority,
'estimated_time': '1-2 days'
})
return action_plan
def _calculate_health_score(self, metrics: ProjectMetrics) -> float:
"""Calculate overall project health score (0-100)"""
score = 0
# Organization score (25%)
score += metrics.organization_score * 25
# Duplication penalty (25%)
duplication_score = max(0, 1 - (metrics.duplication_percentage / 20)) # 20% duplication = 0 score
score += duplication_score * 25
# Security score (25%)
score += metrics.security_score * 25
# Complexity score (25%)
score += (1 - metrics.complexity_score) * 25 # Lower complexity is better
return round(score, 1)
def _calculate_improvement_potential(self, metrics: ProjectMetrics) -> Dict:
"""Calculate improvement potential in different areas"""
return {
'duplication_reduction': f"{metrics.duplication_percentage:.1f}% code can be deduplicated",
'organization_improvement': f"{(1-metrics.organization_score)*100:.1f}% organization improvement possible",
'security_enhancement': f"{(1-metrics.security_score)*100:.1f}% security improvement potential",
'complexity_reduction': f"{metrics.complexity_score*100:.1f}% complexity can be reduced"
}
# Recommendation generators
def _generate_structure_recommendations(self, structure_data: Dict) -> List[str]:
"""Generate structure-related recommendations"""
recommendations = []
if structure_data.get('organization_score', 0) < 0.7:
recommendations.append("Improve project organization and directory structure")
naming_conventions = structure_data.get('naming_conventions', {})
if naming_conventions.get('mixed', 0) > 0:
recommendations.append("Standardize naming conventions across files and directories")
if structure_data.get('architecture_type') == 'microservices':
recommendations.append("Validate microservices architecture best practices")
return recommendations
def _generate_duplicate_recommendations(self, duplicate_data: Dict) -> List[str]:
"""Generate duplication-related recommendations"""
recommendations = []
if duplicate_data.get('duplication_percentage', 0) > 10:
recommendations.append("Significant code duplication detected - consider refactoring into shared modules")
function_dups = duplicate_data.get('function_duplicates', [])
if function_dups:
recommendations.append(f"Found {len(function_dups)} duplicate functions - consolidate into utility modules")
import_dups = duplicate_data.get('import_duplicates', [])
if import_dups:
recommendations.append("Consolidate repeated import patterns")
return recommendations
def _generate_microservices_recommendations(self, microservices_data: Dict) -> List[str]:
"""Generate microservices-related recommendations"""
recommendations = []
if not microservices_data.get('database_independence', {}).get('independent_databases', True):
recommendations.append("Ensure each service has its own database for true independence")
if not microservices_data.get('api_consistency', {}).get('consistent_error_handling', True):
recommendations.append("Standardize error handling patterns across all services")
code_smells = microservices_data.get('code_smells', [])
if code_smells:
recommendations.append("Address microservice-specific code smells")
return recommendations
def _generate_config_recommendations(self, config_data: Dict) -> List[str]:
"""Generate configuration-related recommendations"""
recommendations = []
security_issues = config_data.get('security_issues', [])
if security_issues:
recommendations.append("Address security issues in configuration files")
if not config_data.get('environment_variables', {}).get('centralized_config', True):
recommendations.append("Centralize configuration management")
duplicate_configs = config_data.get('duplicate_configs', [])
if duplicate_configs:
recommendations.append("Remove duplicate configuration entries")
return recommendations
# Main function for testing
def main():
"""Main function for testing the analyzer"""
import sys
if len(sys.argv) != 2:
print("Usage: python codebase_optimizer_engine.py <project_path>")
sys.exit(1)
project_path = sys.argv[1]
if not os.path.exists(project_path):
print(f"Error: Project path '{project_path}' does not exist")
sys.exit(1)
analyzer = CodebaseAnalyzer(project_path)
# Run analysis
print("Starting codebase analysis...")
structure_result = analyzer.analyze_project_structure()
print(f"Structure analysis: {'✓' if structure_result.success else '✗'}")
duplicates_result = analyzer.detect_code_duplicates()
print(f"Duplicate detection: {'✓' if duplicates_result.success else '✗'}")
microservices_result = analyzer.validate_microservices_architecture()
print(f"Microservices validation: {'✓' if microservices_result.success else '✗'}")
config_result = analyzer.optimize_configurations()
print(f"Configuration analysis: {'✓' if config_result.success else '✗'}")
# Generate report
results = {
'structure': structure_result,
'duplicates': duplicates_result,
'microservices': microservices_result,
'configuration': config_result
}
report = analyzer.generate_improvement_report(results)
print("\n" + "="*50)
print("CODEBASE ANALYSIS REPORT")
print("="*50)
print(f"Project: {report['project_path']}")
print(f"Health Score: {report['health_score']}/100")
print(f"Analysis Date: {report['analysis_date']}")
print(f"\nProject Metrics:")
metrics = report['metrics']
print(f" Total Files: {metrics['total_files']}")
print(f" Lines of Code: {metrics['lines_of_code']}")
print(f" Duplication: {metrics['duplication_percentage']:.1f}%")
print(f" Organization Score: {metrics['organization_score']:.1f}")
print(f"\nTop Recommendations:")
for i, rec in enumerate(report['prioritized_recommendations'][:5], 1):
print(f" {i}. [{rec['priority'].upper()}] {rec['recommendation']}")
print(f"\nAction Plan:")
for phase in report['action_plan']:
print(f" Phase: {phase['phase']} ({phase['estimated_time']})")
print(f" {phase['description']}")
if __name__ == "__main__":
main()