"""
项目分析服务
提供文件夹结构分析、代码统计、复杂度分析等功能。
"""
import asyncio
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime
import mimetypes
from data_access import FileSystemInterface, CacheInterface
class AnalysisService:
"""项目分析服务"""
def __init__(self, file_system: FileSystemInterface, cache_service: CacheInterface):
"""
初始化分析服务
Args:
file_system: 文件系统接口
cache_service: 缓存服务
"""
self.file_system = file_system
self.cache_service = cache_service
self.code_extensions = {
'.py', '.js', '.ts', '.java', '.cpp', '.c', '.h', '.hpp',
'.cs', '.php', '.rb', '.go', '.rs', '.swift', '.kt', '.scala',
'.html', '.css', '.scss', '.less', '.vue', '.jsx', '.tsx'
}
self.doc_extensions = {
'.md', '.txt', '.rst', '.adoc', '.doc', '.docx', '.pdf'
}
async def analyze_folder_structure(self, folder_path: str, depth: int = 3) -> Dict[str, Any]:
"""
分析文件夹结构
Args:
folder_path: 文件夹路径
depth: 分析深度
Returns:
文件夹结构分析结果
"""
try:
# 检查缓存
cache_key = f"folder_structure:{folder_path}:{depth}"
cached_result = await self.cache_service.get(cache_key)
if cached_result:
cached_result['from_cache'] = True
return cached_result
# 安全验证
folder_path = self.file_system.security_validator.validate_path(folder_path)
folder = Path(folder_path)
if not folder.exists():
raise FileNotFoundError(f"文件夹不存在: {folder_path}")
if not folder.is_dir():
raise ValueError(f"路径不是文件夹: {folder_path}")
# 执行分析
structure = await self._analyze_structure_recursive(folder, depth)
# 生成统计信息
stats = await self._generate_statistics(structure)
result = {
'path': str(folder.absolute()),
'name': folder.name,
'structure': structure,
'statistics': stats,
'analysis_time': datetime.now().isoformat(),
'depth': depth
}
# 缓存结果
await self.cache_service.set(cache_key, result, expire_seconds=300)
return result
except (RuntimeError, ValueError) as e:
raise RuntimeError(f"分析文件夹结构失败: {e}") from e
async def _analyze_structure_recursive(self, folder: Path, max_depth: int, current_depth: int = 0) -> Dict[str, Any]:
"""递归分析文件夹结构"""
if current_depth >= max_depth:
return {'type': 'folder', 'max_depth_reached': True}
try:
items = []
files = []
folders = []
# 异步获取文件夹内容
item_tasks = []
for item in folder.iterdir():
item_tasks.append(self._analyze_item(item, max_depth, current_depth + 1))
if item_tasks:
item_results = await asyncio.gather(*item_tasks, return_exceptions=True)
for result in item_results:
if isinstance(result, Exception):
continue
if result['type'] == 'file':
files.append(result)
elif result['type'] == 'folder':
folders.append(result)
items.append(result)
# 按名称排序
files.sort(key=lambda x: x['name'])
folders.sort(key=lambda x: x['name'])
items.sort(key=lambda x: (x['type'], x['name']))
return {
'type': 'folder',
'name': folder.name,
'path': str(folder.absolute()),
'items': items,
'files': files,
'folders': folders,
'file_count': len(files),
'folder_count': len(folders),
'total_count': len(items)
}
except PermissionError:
return {
'type': 'folder',
'name': folder.name,
'path': str(folder.absolute()),
'error': 'permission_denied'
}
async def _analyze_item(self, item: Path, max_depth: int, current_depth: int) -> Dict[str, Any]:
"""分析单个文件或文件夹"""
try:
if item.is_file():
return await self._analyze_file(item)
elif item.is_dir():
if current_depth >= max_depth:
return {
'type': 'folder',
'name': item.name,
'path': str(item.absolute()),
'max_depth_reached': True
}
return await self._analyze_structure_recursive(item, max_depth, current_depth)
except (PermissionError, OSError):
return {
'type': 'item',
'name': item.name,
'path': str(item.absolute()),
'error': 'access_denied'
}
async def _analyze_file(self, file: Path) -> Dict[str, Any]:
"""分析单个文件"""
try:
stat = file.stat()
ext = file.suffix.lower()
# 确定文件类型
file_type = self._determine_file_type(ext)
# 基础信息
result = {
'type': 'file',
'name': file.name,
'path': str(file.absolute()),
'extension': ext,
'size': stat.st_size,
'size_human': self._format_size(stat.st_size),
'modified_time': datetime.fromtimestamp(stat.st_mtime).isoformat(),
'file_type': file_type,
'mime_type': mimetypes.guess_type(str(file))[0] or 'application/octet-stream'
}
# 如果是代码文件,尝试简单分析
if file_type == 'code':
try:
# 读取文件头部进行简单分析
content = await self.file_system.read_file(str(file), max_lines=50)
if content:
lines = content.split('\n')
result.update({
'line_count_estimate': len(lines),
'has_shebang': lines[0].startswith('#!') if lines else False,
'encoding': 'utf-8' # 默认编码
})
except (IOError, PermissionError, RuntimeError) as e:
# 忽略读取错误,返回基础信息
pass
return result
except (RuntimeError, ValueError) as e:
return {
'type': 'file',
'name': file.name,
'path': str(file.absolute()),
'error': str(e)
}
def _determine_file_type(self, extension: str) -> str:
"""确定文件类型"""
if extension in self.code_extensions:
return 'code'
elif extension in self.doc_extensions:
return 'documentation'
elif extension in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg', '.webp']:
return 'image'
elif extension in ['.mp4', '.avi', '.mkv', '.mov', '.wmv']:
return 'video'
elif extension in ['.mp3', '.wav', '.flac', '.aac', '.ogg']:
return 'audio'
elif extension in ['.zip', '.rar', '.7z', '.tar', '.gz']:
return 'archive'
elif extension in ['.json', '.xml', '.yaml', '.yml', '.toml', '.ini', '.conf']:
return 'config'
else:
return 'other'
def _format_size(self, size_bytes: int) -> str:
"""格式化文件大小"""
if size_bytes == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = 0
while size_bytes >= 1024.0 and i < len(size_names) - 1:
size_bytes /= 1024.0
i += 1
return f"{size_bytes:.1f} {size_names[i]}"
async def _generate_statistics(self, structure: Dict[str, Any]) -> Dict[str, Any]:
"""生成统计信息"""
stats = {
'total_files': 0,
'total_folders': 0,
'total_size': 0,
'file_types': {},
'largest_files': [],
'code_files': 0,
'doc_files': 0,
'image_files': 0
}
# 递归统计
await self._count_items_recursive(structure, stats)
# 找出最大的文件
if stats['largest_files']:
stats['largest_files'] = sorted(
stats['largest_files'],
key=lambda x: x['size'],
reverse=True
)[:10] # 只保留前10个最大的文件
# 格式化总大小
stats['total_size_human'] = self._format_size(stats['total_size'])
return stats
async def _count_items_recursive(self, item: Dict[str, Any], stats: Dict[str, Any]):
"""递归统计项目"""
if item.get('type') == 'file':
stats['total_files'] += 1
# 统计文件大小
size = item.get('size', 0)
stats['total_size'] += size
# 记录大文件
if size > 0:
stats['largest_files'].append({
'name': item.get('name', ''),
'path': item.get('path', ''),
'size': size,
'size_human': self._format_size(size)
})
# 统计文件类型
file_type = item.get('file_type', 'other')
stats['file_types'][file_type] = stats['file_types'].get(file_type, 0) + 1
# 特定类型统计
if file_type == 'code':
stats['code_files'] += 1
elif file_type == 'documentation':
stats['doc_files'] += 1
elif file_type == 'image':
stats['image_files'] += 1
elif item.get('type') == 'folder':
stats['total_folders'] += 1
# 递归处理子项目
items = item.get('items', [])
if items:
tasks = [self._count_items_recursive(sub_item, stats) for sub_item in items]
await asyncio.gather(*tasks, return_exceptions=True)
async def get_project_complexity(self, folder_path: str) -> Dict[str, Any]:
"""
评估项目复杂度
Args:
folder_path: 项目路径
Returns:
项目复杂度评估结果
"""
try:
# 先分析项目结构
analysis = await self.analyze_folder_structure(folder_path, depth=5)
stats = analysis['statistics']
# 计算复杂度指标
complexity_score = 0
factors = {}
# 文件数量因子
file_count = stats['total_files']
if file_count > 1000:
complexity_score += 3
factors['file_count'] = 'high'
elif file_count > 100:
complexity_score += 2
factors['file_count'] = 'medium'
else:
complexity_score += 1
factors['file_count'] = 'low'
# 文件夹深度因子
max_depth = self._calculate_max_depth(analysis['structure'])
if max_depth > 8:
complexity_score += 3
factors['folder_depth'] = 'deep'
elif max_depth > 5:
complexity_score += 2
factors['folder_depth'] = 'medium'
else:
complexity_score += 1
factors['folder_depth'] = 'shallow'
# 代码文件比例因子
code_ratio = stats['code_files'] / max(file_count, 1)
if code_ratio > 0.7:
complexity_score += 3
factors['code_ratio'] = 'high'
elif code_ratio > 0.3:
complexity_score += 2
factors['code_ratio'] = 'medium'
else:
complexity_score += 1
factors['code_ratio'] = 'low'
# 文件类型多样性因子
type_diversity = len(stats['file_types'])
if type_diversity > 10:
complexity_score += 2
factors['type_diversity'] = 'high'
elif type_diversity > 5:
complexity_score += 1
factors['type_diversity'] = 'medium'
else:
complexity_score += 0
factors['type_diversity'] = 'low'
# 确定复杂度等级
if complexity_score >= 10:
complexity_level = 'very_high'
description = '项目结构非常复杂,需要详细的文档和规范'
elif complexity_score >= 7:
complexity_level = 'high'
description = '项目结构较复杂,建议增加文档和规范'
elif complexity_score >= 4:
complexity_level = 'medium'
description = '项目结构中等复杂度,需要适当文档'
else:
complexity_level = 'low'
description = '项目结构简单,易于理解和维护'
return {
'complexity_score': complexity_score,
'complexity_level': complexity_level,
'description': description,
'factors': factors,
'recommendations': self._generate_complexity_recommendations(complexity_level, factors),
'analysis_timestamp': datetime.now().isoformat()
}
except (RuntimeError, ValueError) as e:
raise RuntimeError(f"评估项目复杂度失败: {e}") from e
def _calculate_max_depth(self, structure: Dict[str, Any], current_depth: int = 1) -> int:
"""计算最大深度"""
max_depth = current_depth
if structure.get('type') == 'folder':
items = structure.get('folders', [])
for item in items:
if not item.get('max_depth_reached'):
depth = self._calculate_max_depth(item, current_depth + 1)
max_depth = max(max_depth, depth)
return max_depth
def _generate_complexity_recommendations(self, complexity_level: str, factors: Dict[str, str]) -> List[str]:
"""生成复杂度建议"""
recommendations = []
if factors.get('file_count') == 'high':
recommendations.append('考虑将项目拆分为多个子模块或包')
recommendations.append('建立完善的索引和导航文档')
if factors.get('folder_depth') == 'deep':
recommendations.append('简化目录结构,减少嵌套层级')
recommendations.append('使用清晰的命名约定提高可读性')
if factors.get('code_ratio') == 'high':
recommendations.append('增加代码注释和API文档')
recommendations.append('建立代码规范和最佳实践指南')
if factors.get('type_diversity') == 'high':
recommendations.append('为不同类型文件建立管理规范')
recommendations.append('提供文件类型说明和使用指南')
if complexity_level in ['high', 'very_high']:
recommendations.append('定期更新和维护文档')
recommendations.append('建立新人入职指南')
return recommendations