Skip to main content
Glama
analysis_service.py16.7 kB
""" 项目分析服务 提供文件夹结构分析、代码统计、复杂度分析等功能。 """ import asyncio from pathlib import Path from typing import Dict, List, Any, Optional from datetime import datetime import mimetypes from data_access import FileSystemInterface, CacheInterface class AnalysisService: """项目分析服务""" def __init__(self, file_system: FileSystemInterface, cache_service: CacheInterface): """ 初始化分析服务 Args: file_system: 文件系统接口 cache_service: 缓存服务 """ self.file_system = file_system self.cache_service = cache_service self.code_extensions = { '.py', '.js', '.ts', '.java', '.cpp', '.c', '.h', '.hpp', '.cs', '.php', '.rb', '.go', '.rs', '.swift', '.kt', '.scala', '.html', '.css', '.scss', '.less', '.vue', '.jsx', '.tsx' } self.doc_extensions = { '.md', '.txt', '.rst', '.adoc', '.doc', '.docx', '.pdf' } async def analyze_folder_structure(self, folder_path: str, depth: int = 3) -> Dict[str, Any]: """ 分析文件夹结构 Args: folder_path: 文件夹路径 depth: 分析深度 Returns: 文件夹结构分析结果 """ try: # 检查缓存 cache_key = f"folder_structure:{folder_path}:{depth}" cached_result = await self.cache_service.get(cache_key) if cached_result: cached_result['from_cache'] = True return cached_result # 安全验证 folder_path = self.file_system.security_validator.validate_path(folder_path) folder = Path(folder_path) if not folder.exists(): raise FileNotFoundError(f"文件夹不存在: {folder_path}") if not folder.is_dir(): raise ValueError(f"路径不是文件夹: {folder_path}") # 执行分析 structure = await self._analyze_structure_recursive(folder, depth) # 生成统计信息 stats = await self._generate_statistics(structure) result = { 'path': str(folder.absolute()), 'name': folder.name, 'structure': structure, 'statistics': stats, 'analysis_time': datetime.now().isoformat(), 'depth': depth } # 缓存结果 await self.cache_service.set(cache_key, result, expire_seconds=300) return result except (RuntimeError, ValueError) as e: raise RuntimeError(f"分析文件夹结构失败: {e}") from e async def _analyze_structure_recursive(self, folder: Path, max_depth: int, current_depth: int = 0) -> Dict[str, Any]: """递归分析文件夹结构""" if current_depth >= max_depth: return {'type': 'folder', 'max_depth_reached': True} try: items = [] files = [] folders = [] # 异步获取文件夹内容 item_tasks = [] for item in folder.iterdir(): item_tasks.append(self._analyze_item(item, max_depth, current_depth + 1)) if item_tasks: item_results = await asyncio.gather(*item_tasks, return_exceptions=True) for result in item_results: if isinstance(result, Exception): continue if result['type'] == 'file': files.append(result) elif result['type'] == 'folder': folders.append(result) items.append(result) # 按名称排序 files.sort(key=lambda x: x['name']) folders.sort(key=lambda x: x['name']) items.sort(key=lambda x: (x['type'], x['name'])) return { 'type': 'folder', 'name': folder.name, 'path': str(folder.absolute()), 'items': items, 'files': files, 'folders': folders, 'file_count': len(files), 'folder_count': len(folders), 'total_count': len(items) } except PermissionError: return { 'type': 'folder', 'name': folder.name, 'path': str(folder.absolute()), 'error': 'permission_denied' } async def _analyze_item(self, item: Path, max_depth: int, current_depth: int) -> Dict[str, Any]: """分析单个文件或文件夹""" try: if item.is_file(): return await self._analyze_file(item) elif item.is_dir(): if current_depth >= max_depth: return { 'type': 'folder', 'name': item.name, 'path': str(item.absolute()), 'max_depth_reached': True } return await self._analyze_structure_recursive(item, max_depth, current_depth) except (PermissionError, OSError): return { 'type': 'item', 'name': item.name, 'path': str(item.absolute()), 'error': 'access_denied' } async def _analyze_file(self, file: Path) -> Dict[str, Any]: """分析单个文件""" try: stat = file.stat() ext = file.suffix.lower() # 确定文件类型 file_type = self._determine_file_type(ext) # 基础信息 result = { 'type': 'file', 'name': file.name, 'path': str(file.absolute()), 'extension': ext, 'size': stat.st_size, 'size_human': self._format_size(stat.st_size), 'modified_time': datetime.fromtimestamp(stat.st_mtime).isoformat(), 'file_type': file_type, 'mime_type': mimetypes.guess_type(str(file))[0] or 'application/octet-stream' } # 如果是代码文件,尝试简单分析 if file_type == 'code': try: # 读取文件头部进行简单分析 content = await self.file_system.read_file(str(file), max_lines=50) if content: lines = content.split('\n') result.update({ 'line_count_estimate': len(lines), 'has_shebang': lines[0].startswith('#!') if lines else False, 'encoding': 'utf-8' # 默认编码 }) except (IOError, PermissionError, RuntimeError) as e: # 忽略读取错误,返回基础信息 pass return result except (RuntimeError, ValueError) as e: return { 'type': 'file', 'name': file.name, 'path': str(file.absolute()), 'error': str(e) } def _determine_file_type(self, extension: str) -> str: """确定文件类型""" if extension in self.code_extensions: return 'code' elif extension in self.doc_extensions: return 'documentation' elif extension in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg', '.webp']: return 'image' elif extension in ['.mp4', '.avi', '.mkv', '.mov', '.wmv']: return 'video' elif extension in ['.mp3', '.wav', '.flac', '.aac', '.ogg']: return 'audio' elif extension in ['.zip', '.rar', '.7z', '.tar', '.gz']: return 'archive' elif extension in ['.json', '.xml', '.yaml', '.yml', '.toml', '.ini', '.conf']: return 'config' else: return 'other' def _format_size(self, size_bytes: int) -> str: """格式化文件大小""" if size_bytes == 0: return "0 B" size_names = ["B", "KB", "MB", "GB", "TB"] i = 0 while size_bytes >= 1024.0 and i < len(size_names) - 1: size_bytes /= 1024.0 i += 1 return f"{size_bytes:.1f} {size_names[i]}" async def _generate_statistics(self, structure: Dict[str, Any]) -> Dict[str, Any]: """生成统计信息""" stats = { 'total_files': 0, 'total_folders': 0, 'total_size': 0, 'file_types': {}, 'largest_files': [], 'code_files': 0, 'doc_files': 0, 'image_files': 0 } # 递归统计 await self._count_items_recursive(structure, stats) # 找出最大的文件 if stats['largest_files']: stats['largest_files'] = sorted( stats['largest_files'], key=lambda x: x['size'], reverse=True )[:10] # 只保留前10个最大的文件 # 格式化总大小 stats['total_size_human'] = self._format_size(stats['total_size']) return stats async def _count_items_recursive(self, item: Dict[str, Any], stats: Dict[str, Any]): """递归统计项目""" if item.get('type') == 'file': stats['total_files'] += 1 # 统计文件大小 size = item.get('size', 0) stats['total_size'] += size # 记录大文件 if size > 0: stats['largest_files'].append({ 'name': item.get('name', ''), 'path': item.get('path', ''), 'size': size, 'size_human': self._format_size(size) }) # 统计文件类型 file_type = item.get('file_type', 'other') stats['file_types'][file_type] = stats['file_types'].get(file_type, 0) + 1 # 特定类型统计 if file_type == 'code': stats['code_files'] += 1 elif file_type == 'documentation': stats['doc_files'] += 1 elif file_type == 'image': stats['image_files'] += 1 elif item.get('type') == 'folder': stats['total_folders'] += 1 # 递归处理子项目 items = item.get('items', []) if items: tasks = [self._count_items_recursive(sub_item, stats) for sub_item in items] await asyncio.gather(*tasks, return_exceptions=True) async def get_project_complexity(self, folder_path: str) -> Dict[str, Any]: """ 评估项目复杂度 Args: folder_path: 项目路径 Returns: 项目复杂度评估结果 """ try: # 先分析项目结构 analysis = await self.analyze_folder_structure(folder_path, depth=5) stats = analysis['statistics'] # 计算复杂度指标 complexity_score = 0 factors = {} # 文件数量因子 file_count = stats['total_files'] if file_count > 1000: complexity_score += 3 factors['file_count'] = 'high' elif file_count > 100: complexity_score += 2 factors['file_count'] = 'medium' else: complexity_score += 1 factors['file_count'] = 'low' # 文件夹深度因子 max_depth = self._calculate_max_depth(analysis['structure']) if max_depth > 8: complexity_score += 3 factors['folder_depth'] = 'deep' elif max_depth > 5: complexity_score += 2 factors['folder_depth'] = 'medium' else: complexity_score += 1 factors['folder_depth'] = 'shallow' # 代码文件比例因子 code_ratio = stats['code_files'] / max(file_count, 1) if code_ratio > 0.7: complexity_score += 3 factors['code_ratio'] = 'high' elif code_ratio > 0.3: complexity_score += 2 factors['code_ratio'] = 'medium' else: complexity_score += 1 factors['code_ratio'] = 'low' # 文件类型多样性因子 type_diversity = len(stats['file_types']) if type_diversity > 10: complexity_score += 2 factors['type_diversity'] = 'high' elif type_diversity > 5: complexity_score += 1 factors['type_diversity'] = 'medium' else: complexity_score += 0 factors['type_diversity'] = 'low' # 确定复杂度等级 if complexity_score >= 10: complexity_level = 'very_high' description = '项目结构非常复杂,需要详细的文档和规范' elif complexity_score >= 7: complexity_level = 'high' description = '项目结构较复杂,建议增加文档和规范' elif complexity_score >= 4: complexity_level = 'medium' description = '项目结构中等复杂度,需要适当文档' else: complexity_level = 'low' description = '项目结构简单,易于理解和维护' return { 'complexity_score': complexity_score, 'complexity_level': complexity_level, 'description': description, 'factors': factors, 'recommendations': self._generate_complexity_recommendations(complexity_level, factors), 'analysis_timestamp': datetime.now().isoformat() } except (RuntimeError, ValueError) as e: raise RuntimeError(f"评估项目复杂度失败: {e}") from e def _calculate_max_depth(self, structure: Dict[str, Any], current_depth: int = 1) -> int: """计算最大深度""" max_depth = current_depth if structure.get('type') == 'folder': items = structure.get('folders', []) for item in items: if not item.get('max_depth_reached'): depth = self._calculate_max_depth(item, current_depth + 1) max_depth = max(max_depth, depth) return max_depth def _generate_complexity_recommendations(self, complexity_level: str, factors: Dict[str, str]) -> List[str]: """生成复杂度建议""" recommendations = [] if factors.get('file_count') == 'high': recommendations.append('考虑将项目拆分为多个子模块或包') recommendations.append('建立完善的索引和导航文档') if factors.get('folder_depth') == 'deep': recommendations.append('简化目录结构,减少嵌套层级') recommendations.append('使用清晰的命名约定提高可读性') if factors.get('code_ratio') == 'high': recommendations.append('增加代码注释和API文档') recommendations.append('建立代码规范和最佳实践指南') if factors.get('type_diversity') == 'high': recommendations.append('为不同类型文件建立管理规范') recommendations.append('提供文件类型说明和使用指南') if complexity_level in ['high', 'very_high']: recommendations.append('定期更新和维护文档') recommendations.append('建立新人入职指南') return recommendations

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kscz0000/Zhiwen-Assistant-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server