"""
分析工具
提供项目结构分析、复杂度评估等工具。
"""
from typing import Dict, Any, List, Optional
from datetime import datetime
from data_access import FileSystemInterface
from services import AnalysisService
def register_analysis_tools(mcp_server, analysis_service: AnalysisService,
file_system: FileSystemInterface):
"""
注册分析相关工具
Args:
mcp_server: FastMCP服务器实例
analysis_service: 分析服务实例
file_system: 文件系统接口实例
"""
@mcp_server.tool()
async def folder_docs_analyze_structure(
folder_path: str,
depth: int = 3,
include_files: bool = True,
include_stats: bool = True,
export_format: str = "json"
) -> Dict[str, Any]:
"""
分析文件夹结构
深入分析指定文件夹的结构,包括文件统计、类型分布等。
Args:
folder_path: 要分析的文件夹路径
depth: 分析深度(嵌套层数)
include_files: 是否包含文件详情
include_stats: 是否包含统计信息
export_format: 导出格式 (json, yaml, markdown)
Returns:
文件夹结构分析结果,包含层次结构、统计信息等
"""
try:
# 安全验证
folder_path = file_system.security_validator.validate_path(folder_path)
# 执行分析
analysis_result = await analysis_service.analyze_folder_structure(folder_path, depth)
# 根据需求过滤内容
if not include_files:
analysis_result = filter_files_from_analysis(analysis_result)
if not include_stats:
analysis_result.pop('statistics', None)
# 格式化输出
if export_format.lower() == "yaml":
formatted_result = await format_analysis_as_yaml(analysis_result)
elif export_format.lower() == "markdown":
formatted_result = await format_analysis_as_markdown(analysis_result)
else:
formatted_result = analysis_result
return {
"success": True,
"folder_path": folder_path,
"analysis": formatted_result,
"depth": depth,
"include_files": include_files,
"include_stats": include_stats,
"format": export_format,
"analysis_time": datetime.now().isoformat()
}
except (RuntimeError, ValueError) as e:
return {
"success": False,
"error": str(e),
"error_type": type(e).__name__,
"folder_path": folder_path
}
@mcp_server.tool()
async def folder_docs_evaluate_complexity(
folder_path: str,
detailed: bool = False,
recommendations: bool = True
) -> Dict[str, Any]:
"""
评估项目复杂度
基于文件数量、目录深度、代码比例等因素评估项目复杂度。
Args:
folder_path: 要评估的项目路径
detailed: 是否返回详细分析
recommendations: 是否包含改进建议
Returns:
项目复杂度评估结果,包含复杂度等级、影响因素、建议等
"""
try:
# 安全验证
folder_path = file_system.security_validator.validate_path(folder_path)
# 执行复杂度评估
complexity_result = await analysis_service.get_project_complexity(folder_path)
# 根据需求过滤内容
if not detailed:
complexity_result.pop('factors', None)
if not recommendations:
complexity_result.pop('recommendations', None)
return {
"success": True,
"folder_path": folder_path,
"complexity": complexity_result,
"evaluation_time": datetime.now().isoformat()
}
except (RuntimeError, ValueError) as e:
return {
"success": False,
"error": str(e),
"error_type": type(e).__name__,
"folder_path": folder_path
}
@mcp_server.tool()
async def folder_docs_generate_report(
folder_path: str,
report_type: str = "comprehensive",
include_structure: bool = True,
include_complexity: bool = True,
include_recommendations: bool = True,
output_file: str = ""
) -> Dict[str, Any]:
"""
生成项目分析报告
生成包含结构分析、复杂度评估和建议的综合报告。
Args:
folder_path: 要分析的项目路径
report_type: 报告类型 (comprehensive, structure, complexity, summary)
include_structure: 是否包含结构分析
include_complexity: 是否包含复杂度评估
include_recommendations: 是否包含改进建议
output_file: 输出文件路径(可选)
Returns:
项目分析报告,包含各种分析结果和建议
"""
try:
# 安全验证
folder_path = file_system.security_validator.validate_path(folder_path)
report_data = {
"project_path": folder_path,
"report_type": report_type,
"generation_time": datetime.now().isoformat(),
"generator": "Optimized Folder Docs MCP Server v2.0.0"
}
# 结构分析
if include_structure and report_type in ["comprehensive", "structure"]:
structure_analysis = await analysis_service.analyze_folder_structure(folder_path, 5)
report_data["structure_analysis"] = structure_analysis
# 复杂度评估
if include_complexity and report_type in ["comprehensive", "complexity"]:
complexity_eval = await analysis_service.get_project_complexity(folder_path)
report_data["complexity_evaluation"] = complexity_eval
# 生成报告内容
report_content = await generate_report_content(report_data, report_type)
# 保存到文件
if output_file:
output_path = file_system.security_validator.validate_path(output_file)
await file_system.write_file(output_path, report_content)
saved_to = output_path
else:
saved_to = ""
return {
"success": True,
"report_type": report_type,
"folder_path": folder_path,
"report_content": report_content,
"saved_to": saved_to,
"includes": {
"structure": include_structure,
"complexity": include_complexity,
"recommendations": include_recommendations
}
}
except (RuntimeError, ValueError) as e:
return {
"success": False,
"error": str(e),
"error_type": type(e).__name__,
"folder_path": folder_path
}
def filter_files_from_analysis(analysis_result: Dict[str, Any]) -> Dict[str, Any]:
"""从分析结果中过滤文件详情"""
def filter_structure(structure: Dict[str, Any]) -> Dict[str, Any]:
if structure.get('type') == 'file':
# 只保留文件的基本信息
return {
'type': 'file',
'name': structure['name'],
'extension': structure.get('extension', ''),
'file_type': structure.get('file_type', 'other')
}
elif structure.get('type') == 'folder':
# 递归处理文件夹
filtered = structure.copy()
if 'items' in filtered:
filtered['items'] = [filter_structure(item) for item in filtered['items']]
if 'files' in filtered:
filtered['files'] = [filter_structure(item) for item in filtered['files']]
if 'folders' in filtered:
filtered['folders'] = [filter_structure(item) for item in filtered['folders']]
return filtered
return structure
filtered_result = analysis_result.copy()
filtered_result['structure'] = filter_structure(analysis_result['structure'])
return filtered_result
async def format_analysis_as_yaml(analysis_result: Dict[str, Any]) -> str:
"""将分析结果格式化为YAML"""
try:
import yaml
return yaml.dump(analysis_result, default_flow_style=False, allow_unicode=True)
except ImportError:
return "需要安装PyYAML: pip install pyyaml"
async def format_analysis_as_markdown(analysis_result: Dict[str, Any]) -> str:
"""将分析结果格式化为Markdown"""
lines = [f"# {analysis_result.get('name', '文件夹结构')} 分析报告"]
lines.append("")
# 基本信息
lines.append("## 📁 基本信息")
lines.append(f"- **路径**: `{analysis_result.get('path', '')}`")
lines.append(f"- **分析时间**: {analysis_result.get('analysis_time', '')}")
lines.append(f"- **分析深度**: {analysis_result.get('depth', 0)}")
lines.append("")
# 统计信息
stats = analysis_result.get('statistics', {})
if stats:
lines.append("## 📊 统计信息")
lines.append(f"- **总文件数**: {stats.get('total_files', 0)}")
lines.append(f"- **总文件夹数**: {stats.get('total_folders', 0)}")
lines.append(f"- **总大小**: {stats.get('total_size_human', '0 B')}")
lines.append("")
# 文件类型分布
file_types = stats.get('file_types', {})
if file_types:
lines.append("### 文件类型分布")
for file_type, count in file_types.items():
lines.append(f"- **{file_type}**: {count}")
lines.append("")
# 结构概览
lines.append("## 🌳 结构概览")
structure_lines = format_structure_as_markdown(analysis_result.get('structure', {}), level=0)
lines.extend(structure_lines)
return "\n".join(lines)
def format_structure_as_markdown(structure: Dict[str, Any], level: int = 0) -> List[str]:
"""将结构格式化为Markdown"""
lines = []
indent = " " * level
if structure.get('type') == 'folder':
lines.append(f"{indent}- 📁 **{structure['name']}**/")
# 添加文件夹
for folder in structure.get('folders', []):
lines.extend(format_structure_as_markdown(folder, level + 1))
# 添加文件
for file_info in structure.get('files', []):
file_name = file_info['name']
file_type = file_info.get('file_type', 'other')
file_size = file_info.get('size_human', '')
icons = {
'code': '📝',
'documentation': '📚',
'image': '🖼️',
'config': '⚙️',
'other': '📄'
}
icon = icons.get(file_type, '📄')
size_info = f" ({file_size})" if file_size else ""
lines.append(f"{indent} - {icon} {file_name}{size_info}")
return lines
async def generate_report_content(report_data: Dict[str, Any], report_type: str) -> str:
"""生成报告内容"""
if report_type == "summary":
return await generate_summary_report(report_data)
else:
return await generate_comprehensive_report(report_data)
async def generate_summary_report(report_data: Dict[str, Any]) -> str:
"""生成摘要报告"""
lines = [f"# 项目摘要报告"]
lines.append("")
project_path = report_data.get('project_path', '')
lines.append(f"**项目路径**: `{project_path}`")
lines.append(f"**生成时间**: {report_data.get('generation_time', '')}")
lines.append("")
# 结构摘要
if 'structure_analysis' in report_data:
stats = report_data['structure_analysis'].get('statistics', {})
lines.append("## 📊 项目统计")
lines.append(f"- **文件数量**: {stats.get('total_files', 0)}")
lines.append(f"- **文件夹数量**: {stats.get('total_folders', 0)}")
lines.append(f"- **项目大小**: {stats.get('total_size_human', '0 B')}")
lines.append(f"- **代码文件**: {stats.get('code_files', 0)}")
lines.append(f"- **文档文件**: {stats.get('doc_files', 0)}")
lines.append("")
# 复杂度摘要
if 'complexity_evaluation' in report_data:
complexity = report_data['complexity_evaluation']
lines.append("## 🎯 复杂度评估")
lines.append(f"- **复杂度等级**: {complexity.get('complexity_level', 'unknown')}")
lines.append(f"- **复杂度分数**: {complexity.get('complexity_score', 0)}")
lines.append(f"- **评估描述**: {complexity.get('description', '')}")
lines.append("")
return "\n".join(lines)
async def generate_comprehensive_report(report_data: Dict[str, Any]) -> str:
"""生成综合报告"""
lines = [f"# 项目综合分析报告"]
lines.append("")
project_path = report_data.get('project_path', '')
lines.append(f"**项目路径**: `{project_path}`")
lines.append(f"**报告类型**: {report_data.get('report_type', '')}")
lines.append(f"**生成时间**: {report_data.get('generation_time', '')}")
lines.append(f"**生成工具**: {report_data.get('generator', '')}")
lines.append("")
# 结构分析部分
if 'structure_analysis' in report_data:
lines.append("## 🌳 项目结构分析")
structure = report_data['structure_analysis']
# 统计信息
stats = structure.get('statistics', {})
lines.append("### 📊 统计信息")
lines.append(f"- **总文件数**: {stats.get('total_files', 0)}")
lines.append(f"- **总文件夹数**: {stats.get('total_folders', 0)}")
lines.append(f"- **总大小**: {stats.get('total_size_human', '0 B')}")
lines.append("")
# 文件类型分布
file_types = stats.get('file_types', {})
if file_types:
lines.append("### 📁 文件类型分布")
for file_type, count in file_types.items():
lines.append(f"- **{file_type}**: {count}")
lines.append("")
# 最大文件
largest_files = stats.get('largest_files', [])
if largest_files:
lines.append("### 📏 最大文件")
for i, file_info in enumerate(largest_files[:5], 1):
lines.append(f"{i}. **{file_info.get('name', '')}** - {file_info.get('size_human', '')}")
lines.append("")
# 复杂度评估部分
if 'complexity_evaluation' in report_data:
lines.append("## 🎯 复杂度评估")
complexity = report_data['complexity_evaluation']
lines.append("### 📈 复杂度指标")
lines.append(f"- **复杂度等级**: {complexity.get('complexity_level', 'unknown')}")
lines.append(f"- **复杂度分数**: {complexity.get('complexity_score', 0)}")
lines.append(f"- **评估时间**: {complexity.get('analysis_timestamp', '')}")
lines.append("")
lines.append("### 📝 评估描述")
lines.append(complexity.get('description', ''))
lines.append("")
# 影响因素
factors = complexity.get('factors', {})
if factors:
lines.append("### 🔍 影响因素")
for factor, level in factors.items():
lines.append(f"- **{factor}**: {level}")
lines.append("")
# 改进建议
recommendations = complexity.get('recommendations', [])
if recommendations:
lines.append("### 💡 改进建议")
for i, rec in enumerate(recommendations, 1):
lines.append(f"{i}. {rec}")
lines.append("")
return "\n".join(lines)