We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/kscz0000/Zhiwen-Assistant-MCP'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
项目分析服务
提供文件夹结构分析、代码统计、复杂度分析等功能。
"""
import asyncio
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime
import mimetypes
from data_access import FileSystemInterface, CacheInterface, CacheError
class AnalysisService:
"""项目分析服务"""
def __init__(self, file_system: FileSystemInterface, cache_service: CacheInterface):
"""
初始化分析服务
Args:
file_system: 文件系统接口
cache_service: 缓存服务
"""
self.file_system = file_system
self.cache_service = cache_service
self.code_extensions = {
'.py', '.js', '.ts', '.java', '.cpp', '.c', '.h', '.hpp',
'.cs', '.php', '.rb', '.go', '.rs', '.swift', '.kt', '.scala',
'.html', '.css', '.scss', '.less', '.vue', '.jsx', '.tsx'
}
self.doc_extensions = {
'.md', '.txt', '.rst', '.adoc', '.doc', '.docx', '.pdf'
}
async def analyze_folder_structure(self, folder_path: str, depth: int = 3) -> Dict[str, Any]:
"""
分析文件夹结构
Args:
folder_path: 文件夹路径
depth: 分析深度
Returns:
文件夹结构分析结果
"""
try:
# 检查缓存
cache_key = f"folder_structure:{folder_path}:{depth}"
cached_result = await self.cache_service.get(cache_key)
if cached_result:
cached_result['from_cache'] = True
return cached_result
# 安全验证
is_valid, error_msg = await self.file_system.validate_path(folder_path)
if not is_valid:
raise ValueError(f"路径验证失败: {error_msg}")
folder = Path(folder_path)
if not folder.exists():
raise FileNotFoundError(f"文件夹不存在: {folder_path}")
if not folder.is_dir():
raise ValueError(f"路径不是文件夹: {folder_path}")
# 执行分析
structure = await self._analyze_structure_recursive(folder, depth)
# 生成统计信息
stats = await self._generate_statistics(structure)
result = {
'path': str(folder.absolute()),
'name': folder.name,
'structure': structure,
'statistics': stats,
'analysis_time': datetime.now().isoformat(),
'depth': depth
}
# 缓存结果
await self.cache_service.set(cache_key, result, expire_seconds=300)
return result
except (RuntimeError, ValueError) as e:
raise RuntimeError(f"分析文件夹结构失败: {e}") from e
async def _analyze_structure_recursive(self, folder: Path, max_depth: int, current_depth: int = 0) -> Dict[str, Any]:
"""递归分析文件夹结构"""
if current_depth >= max_depth:
return {'type': 'folder', 'max_depth_reached': True}
try:
items = []
files = []
folders = []
# 异步获取文件夹内容
item_tasks = []
for item in folder.iterdir():
item_tasks.append(self._analyze_item(item, max_depth, current_depth + 1))
if item_tasks:
item_results = await asyncio.gather(*item_tasks, return_exceptions=True)
for result in item_results:
if isinstance(result, Exception):
continue
if result['type'] == 'file':
files.append(result)
elif result['type'] == 'folder':
folders.append(result)
items.append(result)
# 按名称排序
files.sort(key=lambda x: x['name'])
folders.sort(key=lambda x: x['name'])
items.sort(key=lambda x: (x['type'], x['name']))
return {
'type': 'folder',
'name': folder.name,
'path': str(folder.absolute()),
'items': items,
'files': files,
'folders': folders,
'file_count': len(files),
'folder_count': len(folders),
'total_count': len(items)
}
except PermissionError:
return {
'type': 'folder',
'name': folder.name,
'path': str(folder.absolute()),
'error': 'permission_denied'
}
async def _analyze_item(self, item: Path, max_depth: int, current_depth: int) -> Dict[str, Any]:
"""分析单个文件或文件夹"""
try:
if item.is_file():
return await self._analyze_file(item)
elif item.is_dir():
if current_depth >= max_depth:
return {
'type': 'folder',
'name': item.name,
'path': str(item.absolute()),
'max_depth_reached': True
}
return await self._analyze_structure_recursive(item, max_depth, current_depth)
except (PermissionError, OSError):
return {
'type': 'item',
'name': item.name,
'path': str(item.absolute()),
'error': 'access_denied'
}
async def _analyze_file(self, file: Path) -> Dict[str, Any]:
"""分析单个文件"""
try:
stat = file.stat()
ext = file.suffix.lower()
# 确定文件类型
file_type = self._determine_file_type(ext)
# 基础信息
result = {
'type': 'file',
'name': file.name,
'path': str(file.absolute()),
'extension': ext,
'size': stat.st_size,
'size_human': self._format_size(stat.st_size),
'modified_time': datetime.fromtimestamp(stat.st_mtime).isoformat(),
'file_type': file_type,
'mime_type': mimetypes.guess_type(str(file))[0] or 'application/octet-stream'
}
# 如果是代码文件,尝试简单分析
if file_type == 'code':
try:
# 读取文件头部进行简单分析
content = await self.file_system.read_file(str(file), max_lines=50)
if content:
lines = content.split('\n')
result.update({
'line_count_estimate': len(lines),
'has_shebang': lines[0].startswith('#!') if lines else False,
'encoding': 'utf-8' # 默认编码
})
except (IOError, PermissionError, RuntimeError) as e:
# 忽略读取错误,返回基础信息
pass
return result
except (RuntimeError, ValueError) as e:
return {
'type': 'file',
'name': file.name,
'path': str(file.absolute()),
'error': str(e)
}
def _determine_file_type(self, extension: str) -> str:
"""确定文件类型"""
if extension in self.code_extensions:
return 'code'
elif extension in self.doc_extensions:
return 'documentation'
elif extension in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg', '.webp']:
return 'image'
elif extension in ['.mp4', '.avi', '.mkv', '.mov', '.wmv']:
return 'video'
elif extension in ['.mp3', '.wav', '.flac', '.aac', '.ogg']:
return 'audio'
elif extension in ['.zip', '.rar', '.7z', '.tar', '.gz']:
return 'archive'
elif extension in ['.json', '.xml', '.yaml', '.yml', '.toml', '.ini', '.conf']:
return 'config'
else:
return 'other'
def _format_size(self, size_bytes: int) -> str:
"""格式化文件大小"""
if size_bytes == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = 0
while size_bytes >= 1024.0 and i < len(size_names) - 1:
size_bytes /= 1024.0
i += 1
return f"{size_bytes:.1f} {size_names[i]}"
async def _generate_statistics(self, structure: Dict[str, Any]) -> Dict[str, Any]:
"""生成统计信息"""
stats = {
'total_files': 0,
'total_folders': 0,
'total_size': 0,
'file_types': {},
'largest_files': [],
'code_files': 0,
'doc_files': 0,
'image_files': 0
}
# 递归统计
await self._count_items_recursive(structure, stats)
# 找出最大的文件
if stats['largest_files']:
stats['largest_files'] = sorted(
stats['largest_files'],
key=lambda x: x['size'],
reverse=True
)[:10] # 只保留前10个最大的文件
# 格式化总大小
stats['total_size_human'] = self._format_size(stats['total_size'])
return stats
async def _count_items_recursive(self, item: Dict[str, Any], stats: Dict[str, Any]):
"""递归统计项目"""
if item.get('type') == 'file':
stats['total_files'] += 1
# 统计文件大小
size = item.get('size', 0)
stats['total_size'] += size
# 记录大文件
if size > 0:
stats['largest_files'].append({
'name': item.get('name', ''),
'path': item.get('path', ''),
'size': size,
'size_human': self._format_size(size)
})
# 统计文件类型
file_type = item.get('file_type', 'other')
stats['file_types'][file_type] = stats['file_types'].get(file_type, 0) + 1
# 特定类型统计
if file_type == 'code':
stats['code_files'] += 1
elif file_type == 'documentation':
stats['doc_files'] += 1
elif file_type == 'image':
stats['image_files'] += 1
elif item.get('type') == 'folder':
stats['total_folders'] += 1
# 递归处理子项目
items = item.get('items', [])
if items:
tasks = [self._count_items_recursive(sub_item, stats) for sub_item in items]
await asyncio.gather(*tasks, return_exceptions=True)
async def get_project_complexity(self, folder_path: str) -> Dict[str, Any]:
"""
评估项目复杂度
Args:
folder_path: 项目路径
Returns:
项目复杂度评估结果
"""
try:
# 先分析项目结构
analysis = await self.analyze_folder_structure(folder_path, depth=5)
stats = analysis['statistics']
# 计算复杂度指标
complexity_score = 0
factors = {}
# 文件数量因子
file_count = stats['total_files']
if file_count > 1000:
complexity_score += 3
factors['file_count'] = 'high'
elif file_count > 100:
complexity_score += 2
factors['file_count'] = 'medium'
else:
complexity_score += 1
factors['file_count'] = 'low'
# 文件夹深度因子
max_depth = self._calculate_max_depth(analysis['structure'])
if max_depth > 8:
complexity_score += 3
factors['folder_depth'] = 'deep'
elif max_depth > 5:
complexity_score += 2
factors['folder_depth'] = 'medium'
else:
complexity_score += 1
factors['folder_depth'] = 'shallow'
# 代码文件比例因子
code_ratio = stats['code_files'] / max(file_count, 1)
if code_ratio > 0.7:
complexity_score += 3
factors['code_ratio'] = 'high'
elif code_ratio > 0.3:
complexity_score += 2
factors['code_ratio'] = 'medium'
else:
complexity_score += 1
factors['code_ratio'] = 'low'
# 文件类型多样性因子
type_diversity = len(stats['file_types'])
if type_diversity > 10:
complexity_score += 2
factors['type_diversity'] = 'high'
elif type_diversity > 5:
complexity_score += 1
factors['type_diversity'] = 'medium'
else:
complexity_score += 0
factors['type_diversity'] = 'low'
# 确定复杂度等级
if complexity_score >= 10:
complexity_level = 'very_high'
description = '项目结构非常复杂,需要详细的文档和规范'
elif complexity_score >= 7:
complexity_level = 'high'
description = '项目结构较复杂,建议增加文档和规范'
elif complexity_score >= 4:
complexity_level = 'medium'
description = '项目结构中等复杂度,需要适当文档'
else:
complexity_level = 'low'
description = '项目结构简单,易于理解和维护'
return {
'complexity_score': complexity_score,
'complexity_level': complexity_level,
'description': description,
'factors': factors,
'recommendations': self._generate_complexity_recommendations(complexity_level, factors),
'analysis_timestamp': datetime.now().isoformat()
}
except (RuntimeError, ValueError) as e:
raise RuntimeError(f"评估项目复杂度失败: {e}") from e
def _calculate_max_depth(self, structure: Dict[str, Any], current_depth: int = 1) -> int:
"""计算最大深度"""
max_depth = current_depth
if structure.get('type') == 'folder':
items = structure.get('folders', [])
for item in items:
if not item.get('max_depth_reached'):
depth = self._calculate_max_depth(item, current_depth + 1)
max_depth = max(max_depth, depth)
return max_depth
def _generate_complexity_recommendations(self, complexity_level: str, factors: Dict[str, str]) -> List[str]:
"""生成复杂度建议"""
recommendations = []
if factors.get('file_count') == 'high':
recommendations.append('考虑将项目拆分为多个子模块或包')
recommendations.append('建立完善的索引和导航文档')
if factors.get('folder_depth') == 'deep':
recommendations.append('简化目录结构,减少嵌套层级')
recommendations.append('使用清晰的命名约定提高可读性')
if factors.get('code_ratio') == 'high':
recommendations.append('增加代码注释和API文档')
recommendations.append('建立代码规范和最佳实践指南')
if factors.get('type_diversity') == 'high':
recommendations.append('为不同类型文件建立管理规范')
recommendations.append('提供文件类型说明和使用指南')
if complexity_level in ['high', 'very_high']:
recommendations.append('定期更新和维护文档')
recommendations.append('建立新人入职指南')
return recommendations