Skip to main content
Glama
batch_tools.py12.1 kB
""" 批量处理工具 提供批量生成文档、批量更新等操作。 """ import asyncio from typing import Dict, Any, List, Optional from datetime import datetime from data_access import FileSystemInterface from services import DocumentService, AnalysisService def register_batch_tools(mcp_server, document_service: DocumentService, analysis_service: AnalysisService, file_system: FileSystemInterface): """ 注册批量处理相关工具 Args: mcp_server: FastMCP服务器实例 document_service: 文档服务实例 analysis_service: 分析服务实例 file_system: 文件系统接口实例 """ @mcp_server.tool() async def folder_docs_batch_generate_readme( root_path: str, recursive: bool = True, max_depth: int = 5, template_name: str = "readme_simple.md.j2", overwrite: bool = False, dry_run: bool = False, exclude_patterns: List[str] = None ) -> Dict[str, Any]: """ 批量生成文件夹README文件 批量为指定目录下的所有文件夹生成README文档。 Args: root_path: 根目录路径 recursive: 是否递归处理子目录 max_depth: 最大递归深度 template_name: 使用的模板名称 overwrite: 是否覆盖已存在的README文件 dry_run: 预演模式,不实际写入文件 exclude_patterns: 排除的文件夹模式列表 Returns: 批量生成结果,包含成功、失败统计和详细信息 """ try: if exclude_patterns is None: exclude_patterns = [] # 安全验证 root_path = file_system.security_validator.validate_path(root_path) # 收集所有要处理的文件夹 folders_to_process = await collect_folders( root_path, recursive, max_depth, exclude_patterns, file_system ) if not folders_to_process: return { "success": True, "message": "没有找到符合条件的文件夹", "folders_processed": 0, "results": [] } # 批量处理 results = await batch_process_folders( folders_to_process, document_service, template_name, overwrite, dry_run, file_system ) # 统计结果 successful = len([r for r in results if r["success"]]) failed = len([r for r in results if not r["success"]]) skipped = len([r for r in results if r.get("skipped", False)]) return { "success": True, "root_path": root_path, "total_folders": len(folders_to_process), "successful": successful, "failed": failed, "skipped": skipped, "dry_run": dry_run, "results": results, "processing_time": datetime.now().isoformat() } except (RuntimeError, ValueError) as e: return { "success": False, "error": str(e), "error_type": type(e).__name__, "root_path": root_path } @mcp_server.tool() async def folder_docs_batch_update_documentation( root_path: str, update_readme: bool = True, update_mindmap: bool = False, mindmap_format: str = "mermaid", recursive: bool = True, max_depth: int = 3, force_update: bool = False, dry_run: bool = False ) -> Dict[str, Any]: """ 批量更新项目文档 批量更新指定目录下的README和思维导图文档。 Args: root_path: 根目录路径 update_readme: 是否更新README文件 update_mindmap: 是否更新思维导图文件 mindmap_format: 思维导图格式 recursive: 是否递归处理 max_depth: 最大递归深度 force_update: 是否强制更新(忽略修改时间) dry_run: 预演模式 Returns: 批量更新结果 """ try: # 安全验证 root_path = file_system.security_validator.validate_path(root_path) # 收集所有要处理的文件夹 folders_to_process = await collect_folders( root_path, recursive, max_depth, [], file_system ) if not folders_to_process: return { "success": True, "message": "没有找到符合条件的文件夹", "folders_processed": 0, "results": [] } # 批量更新文档 results = [] for folder_path in folders_to_process: folder_result = { "folder_path": folder_path, "readme_updated": False, "mindmap_updated": False, "success": True, "errors": [] } try: # 更新README if update_readme: readme_result = await document_service.generate_readme( folder_path, force_update=force_update, dry_run=dry_run ) folder_result["readme_updated"] = readme_result.get("success", False) if not readme_result.get("success", False): folder_result["errors"].append(f"README更新失败: {readme_result.get('error')}") # 更新思维导图 if update_mindmap: from .mindmap_tools import folder_docs_generate_mindmap mindmap_result = await folder_docs_generate_mindmap( folder_path, mindmap_format, 5, True, "" ) folder_result["mindmap_updated"] = mindmap_result.get("success", False) if not mindmap_result.get("success", False): folder_result["errors"].append(f"思维导图更新失败: {mindmap_result.get('error')}") except (RuntimeError, ValueError) as e: folder_result["success"] = False folder_result["errors"].append(str(e)) results.append(folder_result) # 统计结果 successful = len([r for r in results if r["success"]]) readme_updated = len([r for r in results if r["readme_updated"]]) mindmap_updated = len([r for r in results if r["mindmap_updated"]]) return { "success": True, "root_path": root_path, "total_folders": len(folders_to_process), "successful": successful, "readme_updated": readme_updated, "mindmap_updated": mindmap_updated, "dry_run": dry_run, "results": results, "processing_time": datetime.now().isoformat() } except (RuntimeError, ValueError) as e: return { "success": False, "error": str(e), "error_type": type(e).__name__, "root_path": root_path } async def collect_folders(root_path: str, recursive: bool, max_depth: int, exclude_patterns: List[str], file_system: FileSystemInterface) -> List[str]: """收集所有要处理的文件夹""" folders = [] try: from pathlib import Path import fnmatch root_dir = Path(root_path) if not root_dir.exists() or not root_dir.is_dir(): return folders def should_exclude(folder_path: str) -> bool: """检查是否应该排除该文件夹""" folder_name = Path(folder_path).name for pattern in exclude_patterns: if fnmatch.fnmatch(folder_name, pattern): return True return False async def scan_folder(folder_path: Path, current_depth: int = 0): """递归扫描文件夹""" if current_depth > max_depth: return # 检查是否应该排除 if should_exclude(str(folder_path)): return # 添加到处理列表(排除根目录) if current_depth > 0: folders.append(str(folder_path)) # 如果需要递归,继续扫描子目录 if recursive and current_depth < max_depth: try: subfolders = [item for item in folder_path.iterdir() if item.is_dir()] # 并发扫描子文件夹 tasks = [scan_folder(subfolder, current_depth + 1) for subfolder in subfolders] await asyncio.gather(*tasks, return_exceptions=True) except (PermissionError, OSError): # 忽略权限错误 pass # 开始扫描 await scan_folder(root_dir) # 按路径排序 folders.sort() except (RuntimeError, TypeError, ValueError) as e: # 如果扫描失败,至少返回根目录 folders = [root_path] if recursive else [] return folders async def batch_process_folders(folders: List[str], document_service: DocumentService, template_name: str, overwrite: bool, dry_run: bool, file_system: FileSystemInterface) -> List[Dict[str, Any]]: """批量处理文件夹""" # 创建信号量控制并发数 semaphore = asyncio.Semaphore(5) # 最多同时处理5个文件夹 async def process_single_folder(folder_path: str) -> Dict[str, Any]: async with semaphore: try: # 生成README result = await document_service.generate_readme( folder_path, template_name, overwrite, dry_run ) return { "folder_path": folder_path, "success": result.get("success", False), "message": result.get("message", ""), "error": result.get("error"), "skipped": result.get("skipped", False), "dry_run": dry_run } except (RuntimeError, ValueError) as e: return { "folder_path": folder_path, "success": False, "error": str(e), "error_type": type(e).__name__, "dry_run": dry_run } # 并发处理所有文件夹 tasks = [process_single_folder(folder) for folder in folders] results = await asyncio.gather(*tasks, return_exceptions=True) # 处理异常结果 processed_results = [] for i, result in enumerate(results): if isinstance(result, Exception): processed_results.append({ "folder_path": folders[i], "success": False, "error": str(result), "error_type": type(result).__name__, "dry_run": dry_run }) else: processed_results.append(result) return processed_results

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kscz0000/Zhiwen-Assistant-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server