"""
Codebase analyzer for different programming languages
This module provides specific analysis functions for extracting the structure,
dependencies, and components of a codebase in different programming languages.
"""
import os
import logging
import json
import re
from typing import List, Dict, Any
from src.analyzers.base_analyzer import BaseAnalyzer
from src.schemas import AnalysisOperationResult
logger = logging.getLogger(__name__)
class CodebaseAnalyzer(BaseAnalyzer):
"""
Codebase analyzer implementation for different languages.
Inherits from `BaseAnalyzer` to provide specific analysis implementation.
"""
async def analyze(self) -> Dict[str, Any]:
"""
Enhanced analysis method that adds MCP detection and semantic classification.
"""
# First run the base analysis
base_result = await super().analyze()
# If base analysis failed, return immediately
if not base_result.success:
return base_result
try:
logger.info("Running enhanced analysis for MCP detection and classification...")
# Import enhanced analyzer
from .enhanced_analyzer import EnhancedAnalyzer
# Collect Python file contents for analysis
file_contents = {}
for root, dirs, files in os.walk(self.working_path):
for file in files:
if file.endswith('.py'):
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
rel_path = os.path.relpath(file_path, self.working_path)
file_contents[rel_path] = f.read()
except Exception as e:
logger.warning(f"Could not read {file_path}: {e}")
continue
if file_contents:
enhanced_analyzer = EnhancedAnalyzer()
enhanced_results = enhanced_analyzer.analyze(
codebase_path=self.working_path,
file_contents=file_contents,
dependencies=base_result.data.get('dependencies', []),
project_structure=base_result.data.get('project_structure', {})
)
# Add enhanced analysis to results
base_result.data['classification'] = enhanced_results.get('classification')
base_result.data['mcp_server_info'] = enhanced_results.get('mcp_server_info')
base_result.data['specialized_analysis'] = enhanced_results.get('specialized_analysis')
classification = enhanced_results.get('classification', {})
logger.info(f"Enhanced analysis complete: {classification.get('primary_type', 'unknown')}")
except Exception as e:
logger.warning(f"Enhanced analysis failed: {e}, continuing with base results")
# Continue with regular results if enhanced analysis fails
return base_result
async def _analyze_dependencies(self) -> List[Dict[str, Any]]:
"""Analyze project dependencies with versions."""
logger.info(f"Analyzing dependencies for {self.path}")
dependencies = []
# Python dependencies
requirements_files = ['requirements.txt', 'requirements.pip', 'requirements-dev.txt']
for req_file in requirements_files:
req_path = os.path.join(self.working_path, req_file)
if os.path.exists(req_path):
try:
with open(req_path, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and not line.startswith('-'):
# Parse: package==1.0.0 or package>=1.0.0 or package
match = re.match(r'^([a-zA-Z0-9_\-\[\]]+)([>=<~!]=?)(.+)?$', line)
if match:
name = match.group(1).strip()
operator = match.group(2) if match.group(2) else None
version = match.group(3).strip() if match.group(3) else None
# Skip if already added
if not any(d['name'] == name for d in dependencies):
dependencies.append({
'name': name,
'version': version if version else 'latest',
'operator': operator if operator else '',
'source': req_file,
'type': 'python'
})
except Exception as e:
logger.warning(f"Error reading {req_file}: {e}")
# Check setup.py for dependencies
setup_py = os.path.join(self.working_path, 'setup.py')
if os.path.exists(setup_py):
try:
with open(setup_py, 'r') as f:
content = f.read()
if 'install_requires' in content:
pattern = r'install_requires\s*=\s*\[(.*?)\]'
match = re.search(pattern, content, re.DOTALL)
if match:
requires = match.group(1)
for line in requires.split(','):
line = line.strip().strip('"').strip("'")
if line:
match = re.match(r'^([a-zA-Z0-9_\-\[\]]+)([>=<~!]=?)(.+)?$', line)
if match:
name = match.group(1).strip()
if not any(d['name'] == name for d in dependencies):
operator = match.group(2) if match.group(2) else None
version = match.group(3).strip() if match.group(3) else None
dependencies.append({
'name': name,
'version': version if version else 'latest',
'operator': operator if operator else '',
'source': 'setup.py',
'type': 'python'
})
except Exception as e:
logger.warning(f"Error reading setup.py: {e}")
# Node.js dependencies
package_json = os.path.join(self.working_path, 'package.json')
if os.path.exists(package_json):
try:
with open(package_json, 'r') as f:
data = json.load(f)
for dep_type in ['dependencies', 'devDependencies']:
if dep_type in data:
for name, version in data[dep_type].items():
if not any(d['name'] == name for d in dependencies):
dependencies.append({
'name': name,
'version': version.lstrip('^~'),
'operator': version[0] if version and version[0] in ['^', '~'] else '',
'source': 'package.json',
'type': 'javascript',
'dev': dep_type == 'devDependencies'
})
except Exception as e:
logger.warning(f"Error reading package.json: {e}")
logger.info(f"Found {len(dependencies)} dependencies")
return dependencies
async def _extract_api_endpoints(self) -> List[Dict[str, Any]]:
"""Extract API endpoints from Flask, FastAPI, Django, Express."""
logger.info(f"Extracting API endpoints for {self.path}")
endpoints = []
try:
# Extract from Python files (Flask, FastAPI, Django)
for root, dirs, files in os.walk(self.working_path):
# Skip common non-code directories
dirs[:] = [d for d in dirs if d not in ['.git', '__pycache__', 'node_modules', 'venv', '.env']]
for file in files:
if file.endswith('.py'):
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# Flask: @app.route() or @bp.route()
flask_pattern = r'@(?:app|bp|blueprint)\.route\([\'"]([^\'"]+)[\'"](?:,\s*methods=\[([^\]]+)\])?\)'
for match in re.finditer(flask_pattern, content):
path, methods = match.groups()
endpoints.append({
'path': path,
'methods': methods.replace("'", "").replace('"', '').replace(' ', '') if methods else 'GET',
'framework': 'Flask',
'file': os.path.relpath(file_path, self.working_path)
})
# FastAPI: @app.get(), @app.post(), @router.get(), etc.
fastapi_pattern = r'@(?:app|router)\.(?P<method>get|post|put|delete|patch|options|head)\([\'"](?P<path>[^\'"]+)[\'"]'
for match in re.finditer(fastapi_pattern, content):
method = match.group('method').upper()
path = match.group('path')
endpoints.append({
'path': path,
'methods': method,
'framework': 'FastAPI',
'file': os.path.relpath(file_path, self.working_path)
})
# Django: path() and re_path() in urls.py
if 'urls.py' in file:
django_pattern = r'(?:path|re_path)\([\'"]([^\'"]+)[\'"]'
for match in re.finditer(django_pattern, content):
path = match.group(1)
endpoints.append({
'path': f'/{path}',
'methods': 'GET,POST',
'framework': 'Django',
'file': os.path.relpath(file_path, self.working_path)
})
except Exception as e:
logger.debug(f"Error reading {file_path}: {e}")
# Extract from JavaScript/TypeScript files (Express)
elif file.endswith(('.js', '.ts')):
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# Express: app.get(), router.post(), etc.
express_pattern = r'(?:app|router)\.(?P<method>get|post|put|delete|patch|all|use)\([\'"`](?P<path>[^\'"` ]+)[\'"`]'
for match in re.finditer(express_pattern, content):
method = match.group('method').upper()
path = match.group('path')
if not path.startswith('('): # Skip middleware patterns
endpoints.append({
'path': path,
'methods': method if method != 'USE' else 'ALL',
'framework': 'Express',
'file': os.path.relpath(file_path, self.working_path)
})
except Exception as e:
logger.debug(f"Error reading {file_path}: {e}")
# Remove duplicates based on path and method
seen = set()
unique_endpoints = []
for ep in endpoints:
key = (ep['path'], ep['methods'])
if key not in seen:
seen.add(key)
unique_endpoints.append(ep)
logger.info(f"Found {len(unique_endpoints)} API endpoints")
return unique_endpoints
except Exception as e:
logger.error(f"Error extracting API endpoints: {e}")
return []
async def _analyze_architecture(self) -> Dict[str, Any]:
"""Analyze project architecture and detect patterns."""
logger.info(f"Analyzing architecture for {self.path}")
architecture = {
'layers': [],
'components': [],
'patterns': [],
'entry_points': [],
'directory_structure': {}
}
try:
# Analyze directory structure to detect architectural layers
for root, dirs, files in os.walk(self.working_path):
# Skip common non-code directories
dirs[:] = [d for d in dirs if d not in ['.git', '__pycache__', 'node_modules', 'venv', '.env', 'dist', 'build']]
rel_path = os.path.relpath(root, self.working_path)
dir_name = os.path.basename(root)
# Detect architectural layers
layer_mappings = {
'controllers': 'Presentation Layer',
'views': 'Presentation Layer',
'routes': 'API Layer',
'api': 'API Layer',
'models': 'Data Access Layer',
'database': 'Data Access Layer',
'db': 'Data Access Layer',
'services': 'Business Logic Layer',
'core': 'Business Logic Layer',
'business': 'Business Logic Layer',
'utils': 'Utility Layer',
'helpers': 'Utility Layer',
'middleware': 'Middleware Layer',
'config': 'Configuration Layer',
'tests': 'Testing Layer',
'migrations': 'Database Migration Layer'
}
for keyword, layer in layer_mappings.items():
if keyword in dir_name.lower():
if layer not in architecture['layers']:
architecture['layers'].append(layer)
architecture['components'].append({
'name': dir_name,
'layer': layer,
'path': rel_path,
'files': len(files)
})
break
# Store directory structure
if rel_path != '.' and len(files) > 0:
architecture['directory_structure'][rel_path] = {
'files': len(files),
'subdirs': len(dirs)
}
# Detect entry points
entry_files = [
'main.py', 'app.py', 'server.py', 'run.py', '__main__.py',
'index.js', 'server.js', 'app.js', 'main.js',
'index.ts', 'server.ts', 'app.ts', 'main.ts'
]
for entry in entry_files:
entry_path = os.path.join(self.working_path, entry)
if os.path.exists(entry_path):
architecture['entry_points'].append(entry)
# Detect architectural patterns
if 'models' in str(architecture['components']) and 'views' in str(architecture['components']) and 'controllers' in str(architecture['components']):
architecture['patterns'].append('MVC (Model-View-Controller)')
if 'services' in str(architecture['components']) and 'models' in str(architecture['components']):
architecture['patterns'].append('Service Layer Pattern')
if 'api' in str(architecture['components']) or 'routes' in str(architecture['components']):
architecture['patterns'].append('RESTful API Architecture')
if 'middleware' in str(architecture['components']):
architecture['patterns'].append('Middleware Pattern')
# Detect microservices indicators
docker_compose = os.path.join(self.working_path, 'docker-compose.yml')
if os.path.exists(docker_compose):
architecture['patterns'].append('Microservices Architecture')
logger.info(f"Detected {len(architecture['layers'])} architectural layers and {len(architecture['patterns'])} patterns")
return architecture
except Exception as e:
logger.error(f"Error analyzing architecture: {e}")
return architecture
async def _calculate_metrics(self) -> Dict[str, Any]:
"""Calculate code metrics."""
logger.info(f"Calculating metrics for {self.path}")
metrics = {
'total_files': 0,
'total_lines': 0,
'file_types': {},
'largest_file': None,
'languages': {}
}
# Count files and lines
def analyze_directory(path):
try:
for item in os.listdir(path):
item_path = os.path.join(path, item)
if os.path.isfile(item_path):
metrics['total_files'] += 1
# Get file extension
_, ext = os.path.splitext(item_path)
if ext:
metrics['file_types'][ext] = metrics['file_types'].get(ext, 0) + 1
# Count lines in text files
try:
with open(item_path, 'r', encoding='utf-8', errors='ignore') as f:
lines = len(f.readlines())
metrics['total_lines'] += lines
# Track largest file
if not metrics['largest_file'] or lines > metrics['largest_file']['lines']:
metrics['largest_file'] = {
'path': item_path.replace(self.working_path, ''),
'lines': lines
}
except:
pass
elif os.path.isdir(item_path) and not item.startswith('.'):
# Recursively analyze subdirectories
analyze_directory(item_path)
except Exception as e:
logger.warning(f"Error analyzing directory {path}: {e}")
analyze_directory(self.working_path)
# Determine primary language based on file extensions
language_extensions = {
'.py': 'Python',
'.js': 'JavaScript',
'.ts': 'TypeScript',
'.java': 'Java',
'.go': 'Go',
'.rs': 'Rust',
'.cpp': 'C++',
'.c': 'C',
'.rb': 'Ruby',
'.php': 'PHP'
}
for ext, count in metrics['file_types'].items():
if ext in language_extensions:
lang = language_extensions[ext]
metrics['languages'][lang] = metrics['languages'].get(lang, 0) + count
return metrics
# Additional methods required by documentation_tools.py
async def analyze_with_pagination(self) -> AnalysisOperationResult:
"""Analyze codebase with pagination support."""
logger.info(f"Analyzing with pagination for {self.path}")
# For now, just delegate to the regular analyze method
# In a full implementation, this would handle chunking and pagination
return await self.analyze()
async def _detect_frameworks(self) -> List[Dict[str, Any]]:
"""Detect frameworks and technology stack with confidence scores."""
logger.info(f"Detecting frameworks for {self.path}")
frameworks = []
framework_indicators = {}
# Check for Python frameworks
requirements_files = ['requirements.txt', 'setup.py', 'pyproject.toml']
for req_file in requirements_files:
req_path = os.path.join(self.working_path, req_file)
if os.path.exists(req_path):
try:
with open(req_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read().lower()
# Check for common Python frameworks
if 'fastapi' in content:
frameworks.append({
'name': 'FastAPI',
'category': 'web_framework',
'confidence': 0.9,
'description': 'Modern, fast web framework for building APIs'
})
if 'flask' in content:
frameworks.append({
'name': 'Flask',
'category': 'web_framework',
'confidence': 0.9,
'description': 'Lightweight WSGI web application framework'
})
if 'django' in content:
frameworks.append({
'name': 'Django',
'category': 'web_framework',
'confidence': 0.9,
'description': 'High-level Python Web framework'
})
if 'streamlit' in content:
frameworks.append({
'name': 'Streamlit',
'category': 'ui_framework',
'confidence': 0.8,
'description': 'Framework for creating data apps'
})
if 'langchain' in content:
frameworks.append({
'name': 'LangChain',
'category': 'ai_framework',
'confidence': 0.8,
'description': 'Framework for developing LLM applications'
})
except Exception as e:
logger.warning(f"Error reading {req_file}: {e}")
# Check for Node.js frameworks
package_json = os.path.join(self.working_path, 'package.json')
if os.path.exists(package_json):
try:
with open(package_json, 'r', encoding='utf-8', errors='ignore') as f:
data = json.load(f)
deps = {**data.get('dependencies', {}), **data.get('devDependencies', {})}
if 'react' in deps:
frameworks.append({
'name': 'React',
'category': 'frontend_framework',
'confidence': 0.9,
'description': 'JavaScript library for building user interfaces'
})
if 'express' in deps:
frameworks.append({
'name': 'Express.js',
'category': 'web_framework',
'confidence': 0.9,
'description': 'Fast, unopinionated web framework for Node.js'
})
if 'next' in deps:
frameworks.append({
'name': 'Next.js',
'category': 'fullstack_framework',
'confidence': 0.9,
'description': 'React framework for production'
})
except Exception as e:
logger.warning(f"Error reading package.json: {e}")
return frameworks
async def _analyze_database_schemas(self) -> List[Dict[str, Any]]:
"""Analyze database schemas from various sources."""
logger.info(f"Analyzing database schemas for {self.path}")
schemas = []
# Look for SQL files
sql_files = []
for root, dirs, files in os.walk(self.working_path):
for file in files:
if file.endswith(('.sql', '.ddl')):
sql_files.append(os.path.join(root, file))
# Analyze SQL files
for sql_file in sql_files:
try:
with open(sql_file, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# Basic SQL parsing for CREATE TABLE statements
tables = self._parse_sql_tables(content)
if tables:
schemas.append({
'source': sql_file.replace(self.working_path, ''),
'type': 'sql',
'tables': tables
})
except Exception as e:
logger.warning(f"Error reading SQL file {sql_file}: {e}")
# Look for Python ORM models (SQLAlchemy, Django, etc.)
python_files = []
for root, dirs, files in os.walk(self.working_path):
for file in files:
if file.endswith('.py') and ('model' in file.lower() or 'schema' in file.lower()):
python_files.append(os.path.join(root, file))
# Analyze Python model files
for py_file in python_files:
try:
with open(py_file, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# Look for ORM patterns
if any(pattern in content for pattern in ['class', 'Model', 'Table', 'Column']):
models = self._parse_python_models(content)
if models:
schemas.append({
'source': py_file.replace(self.working_path, ''),
'type': 'orm',
'models': models
})
except Exception as e:
logger.warning(f"Error reading Python file {py_file}: {e}")
return schemas
async def _parse_code_ast(self) -> List[Dict[str, Any]]:
"""Parse code using AST for detailed analysis with complexity metrics."""
logger.info(f"Parsing AST for {self.path}")
ast_data = []
try:
import ast as python_ast
# Find all Python files
for root, dirs, files in os.walk(self.working_path):
# Skip common non-code directories
dirs[:] = [d for d in dirs if d not in ['.git', '__pycache__', 'node_modules', 'venv', '.env']]
for file in files:
if file.endswith('.py'):
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
source_code = f.read()
tree = python_ast.parse(source_code)
file_analysis = {
'file': os.path.relpath(file_path, self.working_path),
'functions': [],
'classes': [],
'imports': [],
'total_complexity': 0,
'lines_of_code': len(source_code.splitlines()),
'maintainability_index': 0
}
# Extract imports
for node in python_ast.walk(tree):
if isinstance(node, python_ast.Import):
for alias in node.names:
file_analysis['imports'].append(alias.name)
elif isinstance(node, python_ast.ImportFrom):
if node.module:
file_analysis['imports'].append(node.module)
# Analyze functions
for node in python_ast.walk(tree):
if isinstance(node, python_ast.FunctionDef):
complexity = self._calculate_cyclomatic_complexity(node)
# Count parameters
num_params = len(node.args.args)
# Calculate lines
if hasattr(node, 'end_lineno') and hasattr(node, 'lineno'):
lines = node.end_lineno - node.lineno
else:
lines = 0
# Determine complexity rating
if complexity <= 5:
rating = 'Simple'
elif complexity <= 10:
rating = 'Moderate'
elif complexity <= 20:
rating = 'Complex'
else:
rating = 'Very Complex'
function_info = {
'name': node.name,
'complexity': complexity,
'complexity_rating': rating,
'parameters': num_params,
'lines': lines,
'is_async': isinstance(node, python_ast.AsyncFunctionDef)
}
file_analysis['functions'].append(function_info)
file_analysis['total_complexity'] += complexity
elif isinstance(node, python_ast.ClassDef):
# Analyze class
methods = [n for n in node.body if isinstance(n, (python_ast.FunctionDef, python_ast.AsyncFunctionDef))]
if hasattr(node, 'end_lineno') and hasattr(node, 'lineno'):
lines = node.end_lineno - node.lineno
else:
lines = 0
class_info = {
'name': node.name,
'methods': len(methods),
'lines': lines,
'has_init': any(m.name == '__init__' for m in methods)
}
file_analysis['classes'].append(class_info)
# Calculate maintainability index (simplified version)
# MI = 171 - 5.2 * ln(V) - 0.23 * G - 16.2 * ln(LOC)
# Simplified: Higher is better, max 100
if file_analysis['lines_of_code'] > 0:
import math
loc = file_analysis['lines_of_code']
complexity = file_analysis['total_complexity'] if file_analysis['total_complexity'] > 0 else 1
mi = max(0, min(100,
171 - 5.2 * math.log(loc) - 0.23 * complexity - 16.2 * math.log(loc)
))
file_analysis['maintainability_index'] = round(mi, 2)
ast_data.append(file_analysis)
except SyntaxError as e:
logger.debug(f"Syntax error in {file_path}: {e}")
except Exception as e:
logger.debug(f"Error parsing {file_path}: {e}")
logger.info(f"Parsed {len(ast_data)} Python files with AST")
return ast_data
except Exception as e:
logger.error(f"Error in AST parsing: {e}")
return []
def _calculate_cyclomatic_complexity(self, node) -> int:
"""Calculate cyclomatic complexity of a function using AST."""
import ast as python_ast
complexity = 1 # Base complexity
for child in python_ast.walk(node):
# Decision points that increase complexity
if isinstance(child, (python_ast.If, python_ast.While, python_ast.For,
python_ast.ExceptHandler, python_ast.With)):
complexity += 1
elif isinstance(child, python_ast.BoolOp):
# Each additional boolean operator (and/or)
complexity += len(child.values) - 1
elif isinstance(child, (python_ast.Break, python_ast.Continue)):
complexity += 1
return complexity
# Look for Python files
python_files = []
for root, dirs, files in os.walk(self.working_path):
for file in files:
if file.endswith('.py'):
python_files.append(os.path.join(root, file))
# Analyze Python files with AST
for py_file in python_files[:20]: # Limit to first 20 files for performance
try:
with open(py_file, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# Basic AST analysis
import ast as python_ast
try:
tree = python_ast.parse(content)
classes = []
functions = []
imports = []
for node in python_ast.walk(tree):
if isinstance(node, python_ast.ClassDef):
classes.append({
'name': node.name,
'line': node.lineno,
'methods': [method.name for method in node.body if isinstance(method, python_ast.FunctionDef)]
})
elif isinstance(node, python_ast.FunctionDef):
if not any(node.lineno >= cls.lineno for cls in python_ast.walk(tree) if isinstance(cls, python_ast.ClassDef)):
functions.append({
'name': node.name,
'line': node.lineno,
'args': len(node.args.args)
})
elif isinstance(node, (python_ast.Import, python_ast.ImportFrom)):
if isinstance(node, python_ast.Import):
for name in node.names:
imports.append(name.name)
else:
imports.append(node.module or 'relative')
ast_data.append({
'file': py_file.replace(self.working_path, ''),
'language': 'python',
'classes': classes,
'functions': functions,
'imports': list(set(imports)),
'lines': len(content.split('\n'))
})
except SyntaxError as e:
logger.warning(f"Syntax error in {py_file}: {e}")
except Exception as e:
logger.warning(f"Error analyzing {py_file}: {e}")
return ast_data
def _parse_sql_tables(self, sql_content: str) -> List[Dict[str, Any]]:
"""Parse SQL content to extract table definitions."""
tables = []
# Simple regex to find CREATE TABLE statements
import re
table_pattern = re.compile(r'CREATE\s+TABLE\s+(\w+)\s*\((.*?)\)', re.IGNORECASE | re.DOTALL)
matches = table_pattern.findall(sql_content)
for table_name, columns_str in matches:
columns = []
column_lines = [line.strip() for line in columns_str.split(',') if line.strip()]
for column_line in column_lines:
parts = column_line.split()
if len(parts) >= 2:
columns.append({
'name': parts[0].strip(),
'type': parts[1].strip(),
'nullable': 'NOT NULL' not in column_line.upper()
})
if columns:
tables.append({
'name': table_name,
'columns': columns
})
return tables
def _parse_python_models(self, content: str) -> List[Dict[str, Any]]:
"""Parse Python content to extract model definitions."""
models = []
# Look for class definitions that might be models
import re
class_pattern = re.compile(r'class\s+(\w+)\s*\([^)]*\):', re.MULTILINE)
matches = class_pattern.findall(content)
for class_name in matches:
# This is a simplified parser - in reality you'd want proper AST parsing
if any(keyword in content for keyword in ['Column', 'Field', 'models.', 'db.']):
models.append({
'name': class_name,
'type': 'model',
'fields': [] # Would need more sophisticated parsing for actual fields
})
return models