Skip to main content
Glama
analysis_tools.py16.7 kB
""" 分析工具 提供项目结构分析、复杂度评估等工具。 """ from typing import Dict, Any, List, Optional from datetime import datetime from data_access import FileSystemInterface from services import AnalysisService def register_analysis_tools(mcp_server, analysis_service: AnalysisService, file_system: FileSystemInterface): """ 注册分析相关工具 Args: mcp_server: FastMCP服务器实例 analysis_service: 分析服务实例 file_system: 文件系统接口实例 """ @mcp_server.tool() async def folder_docs_analyze_structure( folder_path: str, depth: int = 3, include_files: bool = True, include_stats: bool = True, export_format: str = "json" ) -> Dict[str, Any]: """ 分析文件夹结构 深入分析指定文件夹的结构,包括文件统计、类型分布等。 Args: folder_path: 要分析的文件夹路径 depth: 分析深度(嵌套层数) include_files: 是否包含文件详情 include_stats: 是否包含统计信息 export_format: 导出格式 (json, yaml, markdown) Returns: 文件夹结构分析结果,包含层次结构、统计信息等 """ try: # 安全验证 folder_path = file_system.security_validator.validate_path(folder_path) # 执行分析 analysis_result = await analysis_service.analyze_folder_structure(folder_path, depth) # 根据需求过滤内容 if not include_files: analysis_result = filter_files_from_analysis(analysis_result) if not include_stats: analysis_result.pop('statistics', None) # 格式化输出 if export_format.lower() == "yaml": formatted_result = await format_analysis_as_yaml(analysis_result) elif export_format.lower() == "markdown": formatted_result = await format_analysis_as_markdown(analysis_result) else: formatted_result = analysis_result return { "success": True, "folder_path": folder_path, "analysis": formatted_result, "depth": depth, "include_files": include_files, "include_stats": include_stats, "format": export_format, "analysis_time": datetime.now().isoformat() } except (RuntimeError, ValueError) as e: return { "success": False, "error": str(e), "error_type": type(e).__name__, "folder_path": folder_path } @mcp_server.tool() async def folder_docs_evaluate_complexity( folder_path: str, detailed: bool = False, recommendations: bool = True ) -> Dict[str, Any]: """ 评估项目复杂度 基于文件数量、目录深度、代码比例等因素评估项目复杂度。 Args: folder_path: 要评估的项目路径 detailed: 是否返回详细分析 recommendations: 是否包含改进建议 Returns: 项目复杂度评估结果,包含复杂度等级、影响因素、建议等 """ try: # 安全验证 folder_path = file_system.security_validator.validate_path(folder_path) # 执行复杂度评估 complexity_result = await analysis_service.get_project_complexity(folder_path) # 根据需求过滤内容 if not detailed: complexity_result.pop('factors', None) if not recommendations: complexity_result.pop('recommendations', None) return { "success": True, "folder_path": folder_path, "complexity": complexity_result, "evaluation_time": datetime.now().isoformat() } except (RuntimeError, ValueError) as e: return { "success": False, "error": str(e), "error_type": type(e).__name__, "folder_path": folder_path } @mcp_server.tool() async def folder_docs_generate_report( folder_path: str, report_type: str = "comprehensive", include_structure: bool = True, include_complexity: bool = True, include_recommendations: bool = True, output_file: str = "" ) -> Dict[str, Any]: """ 生成项目分析报告 生成包含结构分析、复杂度评估和建议的综合报告。 Args: folder_path: 要分析的项目路径 report_type: 报告类型 (comprehensive, structure, complexity, summary) include_structure: 是否包含结构分析 include_complexity: 是否包含复杂度评估 include_recommendations: 是否包含改进建议 output_file: 输出文件路径(可选) Returns: 项目分析报告,包含各种分析结果和建议 """ try: # 安全验证 folder_path = file_system.security_validator.validate_path(folder_path) report_data = { "project_path": folder_path, "report_type": report_type, "generation_time": datetime.now().isoformat(), "generator": "Optimized Folder Docs MCP Server v2.0.0" } # 结构分析 if include_structure and report_type in ["comprehensive", "structure"]: structure_analysis = await analysis_service.analyze_folder_structure(folder_path, 5) report_data["structure_analysis"] = structure_analysis # 复杂度评估 if include_complexity and report_type in ["comprehensive", "complexity"]: complexity_eval = await analysis_service.get_project_complexity(folder_path) report_data["complexity_evaluation"] = complexity_eval # 生成报告内容 report_content = await generate_report_content(report_data, report_type) # 保存到文件 if output_file: output_path = file_system.security_validator.validate_path(output_file) await file_system.write_file(output_path, report_content) saved_to = output_path else: saved_to = "" return { "success": True, "report_type": report_type, "folder_path": folder_path, "report_content": report_content, "saved_to": saved_to, "includes": { "structure": include_structure, "complexity": include_complexity, "recommendations": include_recommendations } } except (RuntimeError, ValueError) as e: return { "success": False, "error": str(e), "error_type": type(e).__name__, "folder_path": folder_path } def filter_files_from_analysis(analysis_result: Dict[str, Any]) -> Dict[str, Any]: """从分析结果中过滤文件详情""" def filter_structure(structure: Dict[str, Any]) -> Dict[str, Any]: if structure.get('type') == 'file': # 只保留文件的基本信息 return { 'type': 'file', 'name': structure['name'], 'extension': structure.get('extension', ''), 'file_type': structure.get('file_type', 'other') } elif structure.get('type') == 'folder': # 递归处理文件夹 filtered = structure.copy() if 'items' in filtered: filtered['items'] = [filter_structure(item) for item in filtered['items']] if 'files' in filtered: filtered['files'] = [filter_structure(item) for item in filtered['files']] if 'folders' in filtered: filtered['folders'] = [filter_structure(item) for item in filtered['folders']] return filtered return structure filtered_result = analysis_result.copy() filtered_result['structure'] = filter_structure(analysis_result['structure']) return filtered_result async def format_analysis_as_yaml(analysis_result: Dict[str, Any]) -> str: """将分析结果格式化为YAML""" try: import yaml return yaml.dump(analysis_result, default_flow_style=False, allow_unicode=True) except ImportError: return "需要安装PyYAML: pip install pyyaml" async def format_analysis_as_markdown(analysis_result: Dict[str, Any]) -> str: """将分析结果格式化为Markdown""" lines = [f"# {analysis_result.get('name', '文件夹结构')} 分析报告"] lines.append("") # 基本信息 lines.append("## 📁 基本信息") lines.append(f"- **路径**: `{analysis_result.get('path', '')}`") lines.append(f"- **分析时间**: {analysis_result.get('analysis_time', '')}") lines.append(f"- **分析深度**: {analysis_result.get('depth', 0)}") lines.append("") # 统计信息 stats = analysis_result.get('statistics', {}) if stats: lines.append("## 📊 统计信息") lines.append(f"- **总文件数**: {stats.get('total_files', 0)}") lines.append(f"- **总文件夹数**: {stats.get('total_folders', 0)}") lines.append(f"- **总大小**: {stats.get('total_size_human', '0 B')}") lines.append("") # 文件类型分布 file_types = stats.get('file_types', {}) if file_types: lines.append("### 文件类型分布") for file_type, count in file_types.items(): lines.append(f"- **{file_type}**: {count}") lines.append("") # 结构概览 lines.append("## 🌳 结构概览") structure_lines = format_structure_as_markdown(analysis_result.get('structure', {}), level=0) lines.extend(structure_lines) return "\n".join(lines) def format_structure_as_markdown(structure: Dict[str, Any], level: int = 0) -> List[str]: """将结构格式化为Markdown""" lines = [] indent = " " * level if structure.get('type') == 'folder': lines.append(f"{indent}- 📁 **{structure['name']}**/") # 添加文件夹 for folder in structure.get('folders', []): lines.extend(format_structure_as_markdown(folder, level + 1)) # 添加文件 for file_info in structure.get('files', []): file_name = file_info['name'] file_type = file_info.get('file_type', 'other') file_size = file_info.get('size_human', '') icons = { 'code': '📝', 'documentation': '📚', 'image': '🖼️', 'config': '⚙️', 'other': '📄' } icon = icons.get(file_type, '📄') size_info = f" ({file_size})" if file_size else "" lines.append(f"{indent} - {icon} {file_name}{size_info}") return lines async def generate_report_content(report_data: Dict[str, Any], report_type: str) -> str: """生成报告内容""" if report_type == "summary": return await generate_summary_report(report_data) else: return await generate_comprehensive_report(report_data) async def generate_summary_report(report_data: Dict[str, Any]) -> str: """生成摘要报告""" lines = [f"# 项目摘要报告"] lines.append("") project_path = report_data.get('project_path', '') lines.append(f"**项目路径**: `{project_path}`") lines.append(f"**生成时间**: {report_data.get('generation_time', '')}") lines.append("") # 结构摘要 if 'structure_analysis' in report_data: stats = report_data['structure_analysis'].get('statistics', {}) lines.append("## 📊 项目统计") lines.append(f"- **文件数量**: {stats.get('total_files', 0)}") lines.append(f"- **文件夹数量**: {stats.get('total_folders', 0)}") lines.append(f"- **项目大小**: {stats.get('total_size_human', '0 B')}") lines.append(f"- **代码文件**: {stats.get('code_files', 0)}") lines.append(f"- **文档文件**: {stats.get('doc_files', 0)}") lines.append("") # 复杂度摘要 if 'complexity_evaluation' in report_data: complexity = report_data['complexity_evaluation'] lines.append("## 🎯 复杂度评估") lines.append(f"- **复杂度等级**: {complexity.get('complexity_level', 'unknown')}") lines.append(f"- **复杂度分数**: {complexity.get('complexity_score', 0)}") lines.append(f"- **评估描述**: {complexity.get('description', '')}") lines.append("") return "\n".join(lines) async def generate_comprehensive_report(report_data: Dict[str, Any]) -> str: """生成综合报告""" lines = [f"# 项目综合分析报告"] lines.append("") project_path = report_data.get('project_path', '') lines.append(f"**项目路径**: `{project_path}`") lines.append(f"**报告类型**: {report_data.get('report_type', '')}") lines.append(f"**生成时间**: {report_data.get('generation_time', '')}") lines.append(f"**生成工具**: {report_data.get('generator', '')}") lines.append("") # 结构分析部分 if 'structure_analysis' in report_data: lines.append("## 🌳 项目结构分析") structure = report_data['structure_analysis'] # 统计信息 stats = structure.get('statistics', {}) lines.append("### 📊 统计信息") lines.append(f"- **总文件数**: {stats.get('total_files', 0)}") lines.append(f"- **总文件夹数**: {stats.get('total_folders', 0)}") lines.append(f"- **总大小**: {stats.get('total_size_human', '0 B')}") lines.append("") # 文件类型分布 file_types = stats.get('file_types', {}) if file_types: lines.append("### 📁 文件类型分布") for file_type, count in file_types.items(): lines.append(f"- **{file_type}**: {count}") lines.append("") # 最大文件 largest_files = stats.get('largest_files', []) if largest_files: lines.append("### 📏 最大文件") for i, file_info in enumerate(largest_files[:5], 1): lines.append(f"{i}. **{file_info.get('name', '')}** - {file_info.get('size_human', '')}") lines.append("") # 复杂度评估部分 if 'complexity_evaluation' in report_data: lines.append("## 🎯 复杂度评估") complexity = report_data['complexity_evaluation'] lines.append("### 📈 复杂度指标") lines.append(f"- **复杂度等级**: {complexity.get('complexity_level', 'unknown')}") lines.append(f"- **复杂度分数**: {complexity.get('complexity_score', 0)}") lines.append(f"- **评估时间**: {complexity.get('analysis_timestamp', '')}") lines.append("") lines.append("### 📝 评估描述") lines.append(complexity.get('description', '')) lines.append("") # 影响因素 factors = complexity.get('factors', {}) if factors: lines.append("### 🔍 影响因素") for factor, level in factors.items(): lines.append(f"- **{factor}**: {level}") lines.append("") # 改进建议 recommendations = complexity.get('recommendations', []) if recommendations: lines.append("### 💡 改进建议") for i, rec in enumerate(recommendations, 1): lines.append(f"{i}. {rec}") lines.append("") return "\n".join(lines)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kscz0000/Zhiwen-Assistant-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server